提交 1d4056f4 编写于 作者: P peterq

demo add: 公网ip机器不走web rtc, 直接使用ws

上级 745739b5
......@@ -4,6 +4,9 @@ import Vue from "vue"
import State, {dataTemplate} from "./util/state"
import {registerProxyChannelResolver} from "./lib/vnc/core/RtcWebSocket"
import whatJpg from './assets/what.jpeg'
import {setWsFactory} from "./lib/vnc/core/websock"
import RtcWebSocket from "./lib/vnc/core/RtcWebSocket"
import ProxyWebSocket from "./lib/vnc/core/ProxyWebSocket"
// const $rt = new RealTime('ws://localhost:8001/demo/ws')
......@@ -44,6 +47,50 @@ registerProxyChannelResolver(async function (uri) {
return await host.vncProxyChanel(slave, method === 'view')
})
setWsFactory(function (uri, protocols) {
uri = uri.replace('ws://', '').replace('wss://', '')
let [hostName, slave] = uri.split('/')
let host = $state.hosts.find(function (host) {
return host.name === hostName
})
if (!host)
throw new Error('host not found in host list: ' + hostName)
if (host.wsAgentUrl) {
// return new WebSocket('ws://172.17.0.2:5901', protocols)
// return proxyWs(host.wsAgentUrl + '?slave=' + slave, protocols)
return new ProxyWebSocket(host.wsAgentUrl + '?slave=' + slave, protocols)
}
new RtcWebSocket
})
function proxyWs(u, p) {
let ws = new WebSocket(u, p)
let fake = {
connected: false,
onmessage: null,
}
ws.onmessage = function (evt) {
console.log(evt.data)
ws.onmessage = fake.onmessage
}
return new Proxy(ws, {
get: function (target, key, receiver) {
return Reflect.get(target, key, receiver)
},
set: function (target, key, value, receiver) {
if (key === 'onmessage') {
if (fake.connected) {
return Reflect.set(target, key, value, receiver)
} else {
fake.onmessage = value
return true
}
}
return Reflect.set(target, key, value, receiver)
}
})
}
const connectionRequestMap = {}
console.log(process.env)
if (process.env.NODE_ENV === 'production') {
......
import Base64 from './base64.js'
// PhantomJS can't create Event objects directly, so we need to use this
function make_event(name, props) {
const evt = document.createEvent('Event')
evt.initEvent(name, true, true)
if (props) {
for (let prop in props) {
evt[prop] = props[prop]
}
}
return evt
}
export default class ProxyWebSocket {
constructor(uri, protocols) {
this._ws = new WebSocket(uri, protocols)
this._ws.onclose = (evt) => {
console.log(evt)
this.close(evt.code, evt.reason)
}
this._ws.onerror = (evt) => {
this._error()
}
this._ws.onmessage = (evt) => {
console.log(evt.data)
this._ws.onmessage = this.onmessage
this._open()
}
this._ws.onopen = (evt) => {
}
this._ws.binaryType = this.binaryType = "arraybuffer"
if (!protocols || typeof protocols === 'string') {
this.protocol = protocols
} else {
this.protocol = protocols[0]
}
this.readyState = ProxyWebSocket.CONNECTING
this.__is_fake = true
}
close(code, reason) {
this.readyState = ProxyWebSocket.CLOSED
if (this.onclose) {
this.onclose(make_event("close", {'code': code, 'reason': reason, 'wasClean': true}))
}
this._ws.close()
}
send(data) {
// console.log('rtc web socket send', data)
if (this.protocol === 'base64') {
data = Base64.decode(data)
} else {
data = new Uint8Array(data)
}
this._ws.send(data)
}
_open() {
this.readyState = ProxyWebSocket.OPEN
if (this.onopen) {
this.onopen(make_event('open'))
console.log('rtc web socket open')
}
}
_error() {
this.readyState = ProxyWebSocket.OPEN
if (this.onerror) {
this.onerror(make_event('error'))
console.log('rtc web socket error')
}
}
}
ProxyWebSocket.OPEN = WebSocket.OPEN
ProxyWebSocket.CONNECTING = WebSocket.CONNECTING
ProxyWebSocket.CLOSING = WebSocket.CLOSING
ProxyWebSocket.CLOSED = WebSocket.CLOSED
ProxyWebSocket.__is_fake = true
......@@ -84,17 +84,9 @@ export default class RtcWebSocket {
} else {
data = new Uint8Array(data)
}
// this._send_queue.set(data, this.bufferedAmount);
// this.bufferedAmount += data.length;
this.proxyChannel.send(data)
}
// _get_sent_data() {
// const res = new Uint8Array(this._send_queue.buffer, 0, this.bufferedAmount);
// this.bufferedAmount = 0;
// return res;
// }
_open() {
this.readyState = RtcWebSocket.OPEN
if (this.onopen) {
......
......@@ -21,6 +21,14 @@ import RtcWebSocket from "./RtcWebSocket"
const ENABLE_COPYWITHIN = false;
const MAX_RQ_GROW_SIZE = 40 * 1024 * 1024; // 40 MiB
let wsFactory = function (uri, protocols) {
return new WebSocket(uri, protocols)
}
export function setWsFactory(f) {
wsFactory = f
}
export default class Websock {
constructor() {
this._websocket = null; // WebSocket object
......@@ -182,7 +190,7 @@ export default class Websock {
open(uri, protocols) {
this.init();
this._websocket = new RtcWebSocket(uri, protocols);
this._websocket = wsFactory(uri, protocols);
this._websocket.binaryType = 'arraybuffer';
this._websocket.onmessage = this._recv_message.bind(this);
......
......@@ -14,10 +14,12 @@ import (
)
var host = &struct {
name string
password string
wsAddr string
slaveCount int
name string
password string
wsAddr string
slaveCount int
wsAgentPort string
wsAgentAddr string
slaves []string
......@@ -36,6 +38,8 @@ func Start() {
host.name = env("host_name")
host.password = env("host_password")
host.wsAddr = env("ws_addr")
host.wsAgentPort = env("ws_agent_port")
host.wsAgentAddr = env("ws_agent_addr")
var err error
host.slaveCount, err = strconv.Atoi(env("slave_count"))
if err != nil {
......@@ -50,6 +54,10 @@ func Start() {
}
rt.Init()
rt.RegisterEventListener(eventHandlers)
if host.wsAgentPort != "" {
startWsAgentServer()
}
select {}
}
......@@ -75,7 +83,8 @@ func startServe() {
host.slaves[i] = host.name + ".slave." + strconv.Itoa(i)
}
_, err := rt.Call("slave.register", gson{
"slaves": host.slaves,
"slaves": host.slaves,
"ws_agent_url": host.wsAgentAddr,
})
if err != nil {
log.Println("注册slave失败", err)
......
......@@ -2,8 +2,10 @@ package instance
import (
"context"
"encoding/json"
"github.com/peterq/pan-light/demo/realtime"
"github.com/pkg/errors"
"golang.org/x/net/websocket"
"io"
"log"
"net"
......@@ -223,6 +225,72 @@ func (h *Holder) VncProxy(rw io.ReadWriteCloser, proxyCb func(err error)) {
log.Println("proxy gone rw", rw)
}
func (h *Holder) WsProxy(userConn *websocket.Conn) {
defer userConn.Close()
var addr string
func() {
h.vncAddrLock.Lock()
defer h.vncAddrLock.Unlock()
for h.vncAddr == "" {
log.Println("ws proxy 等待docker ip")
h.vncAddrCond.Wait()
}
addr = h.vncAddr
}()
tcpAddr, _ := net.ResolveTCPAddr("tcp4", addr)
vncConn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Printf("connection failed: %v\n", err)
bin, _ := json.Marshal(gson{
"proxyOk": false,
"message": "连接docker内部vnc出错",
})
userConn.Write(bin)
return
}
defer vncConn.Close()
bin, _ := json.Marshal(gson{
"proxyOk": true,
"message": "连接成功",
})
userConn.Write(bin)
ctx, cancel := context.WithCancel(context.Background())
var ReadLoop = func() {
for {
var msg []byte
err := websocket.Message.Receive(userConn, &msg)
if err != nil {
log.Println("ws agent read from user err:", err)
cancel()
return
}
vncConn.Write(msg)
}
}
var WriteLoop = func() {
for {
msg := make([]byte, 1024)
l, err := vncConn.Read(msg)
if err != nil {
log.Println("ws agent read from vnc err:", err)
cancel()
return
}
websocket.Message.Send(userConn, msg[:l])
}
}
log.Println("ws proxy start", h.SlaveName)
go ReadLoop()
go WriteLoop()
<-ctx.Done()
log.Println("ws proxy gone", h.SlaveName)
}
// 处理 slave 发来的消息
func (h *Holder) HandleEvent(evt string, data interface{}) {
if evt == "start.ok" {
......
......@@ -9,6 +9,7 @@ import (
"io"
"log"
"math/rand"
"strings"
"sync"
"time"
)
......@@ -47,10 +48,22 @@ func handleNewUser(cand, sessionId, requestId string) {
if err != nil {
log.Println("cand 解码错误", err)
}
t := strings.Split(remoteSd.SDP, "\n")
var t2 []string
for _, s := range t {
if strings.Contains(s, "a=candidate:") &&
strings.Contains(s, "local") {
continue
}
t2 = append(t2, s)
}
remoteSd.SDP = strings.Join(t2, "\n")
//log.Println(remoteSd)
log.Println("rtc new", sessionId)
ctx, cancel := context.WithCancel(context.Background())
peerConnection, err := webRtcApi.NewPeerConnection(webRtcConfig)
if err != nil {
log.Println(err)
log.Println("rtc connection error", err)
return
}
......@@ -98,6 +111,7 @@ func handleNewUser(cand, sessionId, requestId string) {
"requestId": requestId,
"sessionId": sessionId,
})
log.Println("rtc send answer", sessionId)
go func() {
time.Sleep(10 * time.Second)
......
package host
import (
"golang.org/x/net/websocket"
"log"
"net/http"
)
func startWsAgentServer() {
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
writer.Write([]byte("hello ws agent"))
})
http.HandleFunc("/ws", beforeWsAgent)
log.Println("ws agent server port", host.wsAgentPort)
http.ListenAndServe(":"+host.wsAgentPort, nil)
}
func beforeWsAgent(writer http.ResponseWriter, request *http.Request) {
slaveName := request.URL.Query().Get("slave")
holder, ok := host.holderMap[slaveName]
if !ok {
writer.Write([]byte("slave not exist"))
return
}
websocket.Handler(holder.WsProxy).ServeHTTP(writer, request)
}
......@@ -71,6 +71,7 @@ var hostRpcMap = map[string]realtime.RpcHandler{
host.slaves[name.(string)] = slave
manager.slaveMap[name.(string)] = slave
}
host.wsAgentUrl = p["ws_agent_url"].(string)
return
}),
"host.hello": realtime.RpcHandleFunc(func(ss *realtime.Session, p gson) (result interface{}, err error) {
......
......@@ -12,9 +12,10 @@ type roleType interface {
}
type roleHost struct {
name string
session *realtime.Session
slaves map[string]*roleSlave
name string
session *realtime.Session
wsAgentUrl string
slaves map[string]*roleSlave
}
func (*roleHost) publicInfo() gson {
......
......@@ -19,8 +19,9 @@ var userRpcMap = map[string]realtime.RpcHandler{
})
}
arr = append(arr, gson{
"name": host.name,
"slaves": slaves,
"name": host.name,
"wsAgentUrl": host.wsAgentUrl,
"slaves": slaves,
})
}
return arr, nil
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册