From 1d4056f4614f224b1f49d4267c666cd97633b7a8 Mon Sep 17 00:00:00 2001 From: peterq Date: Mon, 1 Jul 2019 13:20:44 +0800 Subject: [PATCH] =?UTF-8?q?demo=20add:=20=E5=85=AC=E7=BD=91ip=E6=9C=BA?= =?UTF-8?q?=E5=99=A8=E4=B8=8D=E8=B5=B0web=20rtc,=20=E7=9B=B4=E6=8E=A5?= =?UTF-8?q?=E4=BD=BF=E7=94=A8ws?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- demo/demo-online-front/src/app.js | 47 ++++++++++ .../src/lib/vnc/core/ProxyWebSocket.js | 90 +++++++++++++++++++ .../src/lib/vnc/core/RtcWebSocket.js | 8 -- .../src/lib/vnc/core/websock.js | 10 ++- demo/host/host-init.go | 19 ++-- demo/host/instance/instace-holder.go | 68 ++++++++++++++ demo/host/web-rtc.go | 16 +++- demo/host/ws-agent.go | 26 ++++++ server/demo/rpc/host-handler.go | 1 + server/demo/rpc/roles.go | 7 +- server/demo/rpc/user-handler.go | 5 +- 11 files changed, 277 insertions(+), 20 deletions(-) create mode 100644 demo/demo-online-front/src/lib/vnc/core/ProxyWebSocket.js create mode 100644 demo/host/ws-agent.go diff --git a/demo/demo-online-front/src/app.js b/demo/demo-online-front/src/app.js index 2b440d8..6618f7b 100644 --- a/demo/demo-online-front/src/app.js +++ b/demo/demo-online-front/src/app.js @@ -4,6 +4,9 @@ import Vue from "vue" import State, {dataTemplate} from "./util/state" import {registerProxyChannelResolver} from "./lib/vnc/core/RtcWebSocket" import whatJpg from './assets/what.jpeg' +import {setWsFactory} from "./lib/vnc/core/websock" +import RtcWebSocket from "./lib/vnc/core/RtcWebSocket" +import ProxyWebSocket from "./lib/vnc/core/ProxyWebSocket" // const $rt = new RealTime('ws://localhost:8001/demo/ws') @@ -44,6 +47,50 @@ registerProxyChannelResolver(async function (uri) { return await host.vncProxyChanel(slave, method === 'view') }) +setWsFactory(function (uri, protocols) { + uri = uri.replace('ws://', '').replace('wss://', '') + let [hostName, slave] = uri.split('/') + let host = $state.hosts.find(function (host) { + return host.name === hostName + }) + if (!host) + throw new Error('host not found in host list: ' + hostName) + if (host.wsAgentUrl) { + // return new WebSocket('ws://172.17.0.2:5901', protocols) + // return proxyWs(host.wsAgentUrl + '?slave=' + slave, protocols) + return new ProxyWebSocket(host.wsAgentUrl + '?slave=' + slave, protocols) + } + new RtcWebSocket +}) + +function proxyWs(u, p) { + let ws = new WebSocket(u, p) + let fake = { + connected: false, + onmessage: null, + } + ws.onmessage = function (evt) { + console.log(evt.data) + ws.onmessage = fake.onmessage + } + return new Proxy(ws, { + get: function (target, key, receiver) { + return Reflect.get(target, key, receiver) + }, + set: function (target, key, value, receiver) { + if (key === 'onmessage') { + if (fake.connected) { + return Reflect.set(target, key, value, receiver) + } else { + fake.onmessage = value + return true + } + } + return Reflect.set(target, key, value, receiver) + } + }) +} + const connectionRequestMap = {} console.log(process.env) if (process.env.NODE_ENV === 'production') { diff --git a/demo/demo-online-front/src/lib/vnc/core/ProxyWebSocket.js b/demo/demo-online-front/src/lib/vnc/core/ProxyWebSocket.js new file mode 100644 index 0000000..54c2ad3 --- /dev/null +++ b/demo/demo-online-front/src/lib/vnc/core/ProxyWebSocket.js @@ -0,0 +1,90 @@ +import Base64 from './base64.js' + +// PhantomJS can't create Event objects directly, so we need to use this +function make_event(name, props) { + const evt = document.createEvent('Event') + evt.initEvent(name, true, true) + if (props) { + for (let prop in props) { + evt[prop] = props[prop] + } + } + return evt +} + +export default class ProxyWebSocket { + constructor(uri, protocols) { + + this._ws = new WebSocket(uri, protocols) + this._ws.onclose = (evt) => { + console.log(evt) + this.close(evt.code, evt.reason) + } + this._ws.onerror = (evt) => { + this._error() + } + this._ws.onmessage = (evt) => { + console.log(evt.data) + this._ws.onmessage = this.onmessage + this._open() + } + this._ws.onopen = (evt) => { + + } + this._ws.binaryType = this.binaryType = "arraybuffer" + + if (!protocols || typeof protocols === 'string') { + this.protocol = protocols + } else { + this.protocol = protocols[0] + } + this.readyState = ProxyWebSocket.CONNECTING + this.__is_fake = true + } + + + close(code, reason) { + this.readyState = ProxyWebSocket.CLOSED + if (this.onclose) { + this.onclose(make_event("close", {'code': code, 'reason': reason, 'wasClean': true})) + } + this._ws.close() + } + + send(data) { + // console.log('rtc web socket send', data) + if (this.protocol === 'base64') { + data = Base64.decode(data) + } else { + data = new Uint8Array(data) + } + this._ws.send(data) + } + + + _open() { + this.readyState = ProxyWebSocket.OPEN + if (this.onopen) { + this.onopen(make_event('open')) + console.log('rtc web socket open') + } + } + + _error() { + this.readyState = ProxyWebSocket.OPEN + if (this.onerror) { + this.onerror(make_event('error')) + console.log('rtc web socket error') + } + } + +} + + +ProxyWebSocket.OPEN = WebSocket.OPEN +ProxyWebSocket.CONNECTING = WebSocket.CONNECTING +ProxyWebSocket.CLOSING = WebSocket.CLOSING +ProxyWebSocket.CLOSED = WebSocket.CLOSED + +ProxyWebSocket.__is_fake = true + diff --git a/demo/demo-online-front/src/lib/vnc/core/RtcWebSocket.js b/demo/demo-online-front/src/lib/vnc/core/RtcWebSocket.js index c79d1e7..98f1a16 100644 --- a/demo/demo-online-front/src/lib/vnc/core/RtcWebSocket.js +++ b/demo/demo-online-front/src/lib/vnc/core/RtcWebSocket.js @@ -84,17 +84,9 @@ export default class RtcWebSocket { } else { data = new Uint8Array(data) } - // this._send_queue.set(data, this.bufferedAmount); - // this.bufferedAmount += data.length; this.proxyChannel.send(data) } - // _get_sent_data() { - // const res = new Uint8Array(this._send_queue.buffer, 0, this.bufferedAmount); - // this.bufferedAmount = 0; - // return res; - // } - _open() { this.readyState = RtcWebSocket.OPEN if (this.onopen) { diff --git a/demo/demo-online-front/src/lib/vnc/core/websock.js b/demo/demo-online-front/src/lib/vnc/core/websock.js index c6bb47d..7358785 100644 --- a/demo/demo-online-front/src/lib/vnc/core/websock.js +++ b/demo/demo-online-front/src/lib/vnc/core/websock.js @@ -21,6 +21,14 @@ import RtcWebSocket from "./RtcWebSocket" const ENABLE_COPYWITHIN = false; const MAX_RQ_GROW_SIZE = 40 * 1024 * 1024; // 40 MiB +let wsFactory = function (uri, protocols) { + return new WebSocket(uri, protocols) +} + +export function setWsFactory(f) { + wsFactory = f +} + export default class Websock { constructor() { this._websocket = null; // WebSocket object @@ -182,7 +190,7 @@ export default class Websock { open(uri, protocols) { this.init(); - this._websocket = new RtcWebSocket(uri, protocols); + this._websocket = wsFactory(uri, protocols); this._websocket.binaryType = 'arraybuffer'; this._websocket.onmessage = this._recv_message.bind(this); diff --git a/demo/host/host-init.go b/demo/host/host-init.go index 8d950a7..2f318da 100644 --- a/demo/host/host-init.go +++ b/demo/host/host-init.go @@ -14,10 +14,12 @@ import ( ) var host = &struct { - name string - password string - wsAddr string - slaveCount int + name string + password string + wsAddr string + slaveCount int + wsAgentPort string + wsAgentAddr string slaves []string @@ -36,6 +38,8 @@ func Start() { host.name = env("host_name") host.password = env("host_password") host.wsAddr = env("ws_addr") + host.wsAgentPort = env("ws_agent_port") + host.wsAgentAddr = env("ws_agent_addr") var err error host.slaveCount, err = strconv.Atoi(env("slave_count")) if err != nil { @@ -50,6 +54,10 @@ func Start() { } rt.Init() rt.RegisterEventListener(eventHandlers) + + if host.wsAgentPort != "" { + startWsAgentServer() + } select {} } @@ -75,7 +83,8 @@ func startServe() { host.slaves[i] = host.name + ".slave." + strconv.Itoa(i) } _, err := rt.Call("slave.register", gson{ - "slaves": host.slaves, + "slaves": host.slaves, + "ws_agent_url": host.wsAgentAddr, }) if err != nil { log.Println("注册slave失败", err) diff --git a/demo/host/instance/instace-holder.go b/demo/host/instance/instace-holder.go index ff26200..93d0687 100644 --- a/demo/host/instance/instace-holder.go +++ b/demo/host/instance/instace-holder.go @@ -2,8 +2,10 @@ package instance import ( "context" + "encoding/json" "github.com/peterq/pan-light/demo/realtime" "github.com/pkg/errors" + "golang.org/x/net/websocket" "io" "log" "net" @@ -223,6 +225,72 @@ func (h *Holder) VncProxy(rw io.ReadWriteCloser, proxyCb func(err error)) { log.Println("proxy gone rw", rw) } +func (h *Holder) WsProxy(userConn *websocket.Conn) { + defer userConn.Close() + var addr string + func() { + h.vncAddrLock.Lock() + defer h.vncAddrLock.Unlock() + for h.vncAddr == "" { + log.Println("ws proxy 等待docker ip") + h.vncAddrCond.Wait() + } + addr = h.vncAddr + }() + + tcpAddr, _ := net.ResolveTCPAddr("tcp4", addr) + vncConn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + log.Printf("connection failed: %v\n", err) + bin, _ := json.Marshal(gson{ + "proxyOk": false, + "message": "连接docker内部vnc出错", + }) + userConn.Write(bin) + return + } + defer vncConn.Close() + + bin, _ := json.Marshal(gson{ + "proxyOk": true, + "message": "连接成功", + }) + userConn.Write(bin) + + ctx, cancel := context.WithCancel(context.Background()) + var ReadLoop = func() { + for { + var msg []byte + err := websocket.Message.Receive(userConn, &msg) + if err != nil { + log.Println("ws agent read from user err:", err) + cancel() + return + } + vncConn.Write(msg) + } + } + + var WriteLoop = func() { + for { + msg := make([]byte, 1024) + l, err := vncConn.Read(msg) + if err != nil { + log.Println("ws agent read from vnc err:", err) + cancel() + return + } + websocket.Message.Send(userConn, msg[:l]) + } + } + + log.Println("ws proxy start", h.SlaveName) + go ReadLoop() + go WriteLoop() + <-ctx.Done() + log.Println("ws proxy gone", h.SlaveName) +} + // 处理 slave 发来的消息 func (h *Holder) HandleEvent(evt string, data interface{}) { if evt == "start.ok" { diff --git a/demo/host/web-rtc.go b/demo/host/web-rtc.go index 19abb1a..f9759dc 100644 --- a/demo/host/web-rtc.go +++ b/demo/host/web-rtc.go @@ -9,6 +9,7 @@ import ( "io" "log" "math/rand" + "strings" "sync" "time" ) @@ -47,10 +48,22 @@ func handleNewUser(cand, sessionId, requestId string) { if err != nil { log.Println("cand 解码错误", err) } + t := strings.Split(remoteSd.SDP, "\n") + var t2 []string + for _, s := range t { + if strings.Contains(s, "a=candidate:") && + strings.Contains(s, "local") { + continue + } + t2 = append(t2, s) + } + remoteSd.SDP = strings.Join(t2, "\n") + //log.Println(remoteSd) + log.Println("rtc new", sessionId) ctx, cancel := context.WithCancel(context.Background()) peerConnection, err := webRtcApi.NewPeerConnection(webRtcConfig) if err != nil { - log.Println(err) + log.Println("rtc connection error", err) return } @@ -98,6 +111,7 @@ func handleNewUser(cand, sessionId, requestId string) { "requestId": requestId, "sessionId": sessionId, }) + log.Println("rtc send answer", sessionId) go func() { time.Sleep(10 * time.Second) diff --git a/demo/host/ws-agent.go b/demo/host/ws-agent.go new file mode 100644 index 0000000..c879354 --- /dev/null +++ b/demo/host/ws-agent.go @@ -0,0 +1,26 @@ +package host + +import ( + "golang.org/x/net/websocket" + "log" + "net/http" +) + +func startWsAgentServer() { + http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + writer.Write([]byte("hello ws agent")) + }) + http.HandleFunc("/ws", beforeWsAgent) + log.Println("ws agent server port", host.wsAgentPort) + http.ListenAndServe(":"+host.wsAgentPort, nil) +} + +func beforeWsAgent(writer http.ResponseWriter, request *http.Request) { + slaveName := request.URL.Query().Get("slave") + holder, ok := host.holderMap[slaveName] + if !ok { + writer.Write([]byte("slave not exist")) + return + } + websocket.Handler(holder.WsProxy).ServeHTTP(writer, request) +} diff --git a/server/demo/rpc/host-handler.go b/server/demo/rpc/host-handler.go index 14fdd1e..0d7e425 100644 --- a/server/demo/rpc/host-handler.go +++ b/server/demo/rpc/host-handler.go @@ -71,6 +71,7 @@ var hostRpcMap = map[string]realtime.RpcHandler{ host.slaves[name.(string)] = slave manager.slaveMap[name.(string)] = slave } + host.wsAgentUrl = p["ws_agent_url"].(string) return }), "host.hello": realtime.RpcHandleFunc(func(ss *realtime.Session, p gson) (result interface{}, err error) { diff --git a/server/demo/rpc/roles.go b/server/demo/rpc/roles.go index d0d956c..f84643d 100644 --- a/server/demo/rpc/roles.go +++ b/server/demo/rpc/roles.go @@ -12,9 +12,10 @@ type roleType interface { } type roleHost struct { - name string - session *realtime.Session - slaves map[string]*roleSlave + name string + session *realtime.Session + wsAgentUrl string + slaves map[string]*roleSlave } func (*roleHost) publicInfo() gson { diff --git a/server/demo/rpc/user-handler.go b/server/demo/rpc/user-handler.go index 3375d83..70f28f9 100644 --- a/server/demo/rpc/user-handler.go +++ b/server/demo/rpc/user-handler.go @@ -19,8 +19,9 @@ var userRpcMap = map[string]realtime.RpcHandler{ }) } arr = append(arr, gson{ - "name": host.name, - "slaves": slaves, + "name": host.name, + "wsAgentUrl": host.wsAgentUrl, + "slaves": slaves, }) } return arr, nil -- GitLab