提交 9d130b2f 编写于 作者: P peterq

fix demo

上级 cab15ba1
......@@ -26,6 +26,7 @@ github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/peterq/pan-light v0.0.0-20190630114539-5740b2e61fc8 h1:l9G3+foZSQM1wexOK1inl7TGWJxzlEWCzgfLN/vNwEE=
github.com/pion/datachannel v1.3.0 h1:gxt/xGufDn8Yylk0uJB231xbGQVlFjVps+KdUAUl5Ls=
github.com/pion/datachannel v1.3.0/go.mod h1:lxFbZLIT+EBPmy5AiCv8M0CXkcuTL53A4cyagZiRrDo=
github.com/pion/dtls v1.3.0 h1:5jcC5bBzRcLfxmUH60zp/slIe/tjCLmz6AUZagPYmhA=
......
......@@ -125,6 +125,7 @@ func (h *Holder) startIns() {
defer exec.Command("docker", "rm", "-v", "-f", h.containerName).Run()
exec.Command("docker", "rm", "-v", "-f", h.containerName).Run()
e, _ := filepath.Abs("./slave/ubuntu16.04/root.pan-light")
log.Println(e)
// 启动docker
dockerP := exec.Command("docker", "run",
"-m", "400m", "--memory-swap", "500m", // 400m 内存
......@@ -134,7 +135,7 @@ func (h *Holder) startIns() {
"-e", "slave_name="+h.SlaveName, "-e", "ws_addr="+h.WsAddr, // ws 地址
"-e", "demo_order="+strconv.FormatInt(h.order, 10), // demo order
"-e", "demo_user="+h.sessionId, // 用户session
"-v"+e+":/root/pan-light", // 开发时文件映射, 正式环境使用docker copy
//"-v"+e+":/root/pan-light", // 开发时文件映射, 正式环境使用docker copy
"--name="+h.containerName+"", // 容器名
dockerImage)
defer exec.Command("docker", "kill", h.containerName)
......
......@@ -37,7 +37,8 @@ type RealTime struct {
sessionSecret string
listenerMap map[string][]func(data interface{}, room string)
callMap map[float64]chan<- *callResult
callMap sync.Map
//callMap map[float64]chan<- *callResult
callMapLock sync.Mutex
logWsMsg bool
}
......@@ -49,7 +50,21 @@ func (rt *RealTime) Init() {
rt.inited = true
rt.connectOkCond = sync.NewCond(&rt.connectLock)
rt.listenerMap = map[string][]func(data interface{}, room string){}
rt.callMap = map[float64]chan<- *callResult{}
go func() {
for range time.Tick(10 * time.Second) {
rt.Call("ping", gson{})
}
}()
rt.RegisterEventListener(map[string]func(data interface{}, room string){
"session.new": func(data interface{}, room string) {
rt.sessionId = data.(gson)["id"].(string)
rt.sessionSecret = data.(gson)["id"].(string)
log.Println("rt 会话创建成功")
},
"session.resume": func(data interface{}, room string) {
log.Println("rt 会话恢复成功")
},
})
go rt.connect()
}
......@@ -67,27 +82,28 @@ func (rt *RealTime) connect() {
go func() {
for {
rt.connectLock.Lock()
for rt.connectOK != true {
for rt.connectOK != true { // 等待连接ok
rt.connectOkCond.Wait()
}
rt.connectLock.Unlock()
if rt.OnConnected != nil {
go rt.OnConnected()
}
log.Println("ws连接成功")
rt.readLoop()
}
}()
first := true
for {
func() {
func() { // 等待链接不ok, 进行连接
rt.connectLock.Lock()
for rt.connectOK != false {
rt.connectOkCond.Wait()
}
defer func() {
rt.connectLock.Unlock()
rt.connectOkCond.Broadcast()
}()
for rt.connectOK != false {
rt.connectOkCond.Wait()
}
if !first {
time.Sleep(5 * time.Second)
}
......@@ -102,12 +118,14 @@ func (rt *RealTime) connect() {
rt.conn = conn
rt.connectOK = true
if rt.sessionId != "" {
log.Println("will resume session")
err = rt.write(gson{
"type": "session.resume",
"sessionId": rt.sessionId,
"sessionSecret": rt.sessionSecret,
})
} else {
log.Println("will new session")
err = rt.write(gson{
"type": "session.new",
})
......@@ -119,7 +137,7 @@ func (rt *RealTime) connect() {
})
}
if err != nil {
log.Println("write error")
log.Println("write error", err)
rt.connectOK = false
return
}
......@@ -158,7 +176,7 @@ func (rt *RealTime) handleMsg(data gson) {
return
}
for _, cb := range cbs {
go func() {
go func(cb func(data interface{}, room string)) {
defer func() {
if e := recover(); e != nil {
log.Println(e)
......@@ -166,13 +184,13 @@ func (rt *RealTime) handleMsg(data gson) {
}
}()
cb(data["payload"], room.(string))
}()
}(cb)
}
return
}
if t == "call.result" {
id := data["id"].(float64)
ch, ok := rt.callMap[id]
ch, ok := rt.callMap.Load(id)
if !ok {
return
}
......@@ -186,7 +204,7 @@ func (rt *RealTime) handleMsg(data gson) {
ret.err = errors.New(data["error"].(string))
ret.ret = nil
}
ch <- ret
ch.(chan *callResult) <- ret
return
}
......@@ -214,7 +232,7 @@ func (rt *RealTime) Call(method string, param gson) (result interface{}, err err
ch := make(chan *callResult)
rt.callMapLock.Lock()
rt.callMap[id] = ch
rt.callMap.Store(id, ch)
rt.callMapLock.Unlock()
rt.write(gson{
......@@ -229,7 +247,7 @@ func (rt *RealTime) Call(method string, param gson) (result interface{}, err err
close(ch)
rt.callMapLock.Lock()
delete(rt.callMap, id)
rt.callMap.Delete(id)
rt.callMapLock.Unlock()
return
......@@ -248,6 +266,11 @@ func (rt *RealTime) read() (data gson, err error) {
}
func (rt *RealTime) write(data gson) (err error) {
if !rt.connectOK {
return errors.New("connect not ok")
}
if rt.logWsMsg {
log.Println("ws ->", data)
}
......
......@@ -16,6 +16,7 @@ type executor struct {
slaveName string
userSessionId string
order int64
rtOkCh chan bool
}
func (e *executor) startX() {
......@@ -24,6 +25,7 @@ func (e *executor) startX() {
})
log.Println("set password")
vnc_password.SetPassword(env("vnc_operate_pwd"), env("vnc_view_pwd"))
<-e.rtOkCh
e.notifyHost("start.ok", gson{})
startTime := time.Now()
endTime := startTime.Add(5 * time.Minute)
......
......@@ -7,12 +7,15 @@ import (
"os/exec"
"path/filepath"
"strconv"
"sync"
)
func Start() {
log.Println("hello pan light, real_time connecting")
log.SetFlags(log.LstdFlags | log.Lshortfile)
log.SetPrefix(env("slave_name") + " ")
pwd := env("host_password")
os.Unsetenv("host_password")
rt = &realtime.RealTime{
WsAddr: env("ws_addr"),
Role: "slave",
......@@ -21,14 +24,25 @@ func Start() {
SlaveName: env("slave_name"),
OnConnected: nil,
}
rt.Init()
order, _ := strconv.ParseInt(env("demo_order"), 10, 64)
exe = &executor{
hostName: rt.HostName,
slaveName: rt.SlaveName,
order: order,
userSessionId: env("demo_user"),
order: order,
rtOkCh: make(chan bool, 1),
}
log.Println("hello pan light, real_time connecting")
once := sync.Once{}
rt.Init()
rt.RegisterEventListener(map[string]func(data interface{}, room string){
"session.new": func(data interface{}, room string) {
once.Do(func() {
exe.rtOkCh <- true
})
},
})
exe.startX()
}
......
......@@ -91,7 +91,8 @@ func demoAvatar() {
}
func buildDockerSlave() {
runCmd("demo", "docker", "build", "-t", "pan-light-slave", "./slave/ubuntu16.04")
runCmd("demo", "go", "build", "-o", "./slave/ubuntu16.04/root.pan-light/demo_instance_manager", "slave.go")
//runCmd("demo", "docker", "build", "-t", "pan-light-slave", "./slave/ubuntu16.04")
}
func demoHost() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册