proxy.go 14.7 KB
Newer Older
F
fatedier 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fatedier 已提交
15
package proxy
F
fatedier 已提交
16 17

import (
F
fatedier 已提交
18
	"bytes"
F
fatedier 已提交
19
	"context"
F
fatedier 已提交
20 21
	"fmt"
	"io"
F
fatedier 已提交
22
	"io/ioutil"
F
fatedier 已提交
23
	"net"
F
fatedier 已提交
24 25
	"strconv"
	"strings"
F
fatedier 已提交
26
	"sync"
27
	"time"
F
fatedier 已提交
28 29

	"github.com/fatedier/frp/models/config"
F
fatedier 已提交
30
	"github.com/fatedier/frp/models/msg"
31
	"github.com/fatedier/frp/models/plugin"
F
fatedier 已提交
32
	"github.com/fatedier/frp/models/proto/udp"
F
fatedier 已提交
33
	"github.com/fatedier/frp/utils/limit"
F
fatedier 已提交
34
	frpNet "github.com/fatedier/frp/utils/net"
F
fatedier 已提交
35
	"github.com/fatedier/frp/utils/xlog"
F
fatedier 已提交
36 37

	"github.com/fatedier/golib/errors"
F
fatedier 已提交
38
	frpIo "github.com/fatedier/golib/io"
F
fatedier 已提交
39
	"github.com/fatedier/golib/pool"
F
fatedier 已提交
40
	fmux "github.com/hashicorp/yamux"
F
fatedier 已提交
41
	pp "github.com/pires/go-proxyproto"
F
fatedier 已提交
42
	"golang.org/x/time/rate"
F
fatedier 已提交
43 44
)

F
fatedier 已提交
45
// Proxy defines how to handle work connections for different proxy type.
F
fatedier 已提交
46
type Proxy interface {
F
fatedier 已提交
47
	Run() error
F
fatedier 已提交
48 49

	// InWorkConn accept work connections registered to server.
F
fatedier 已提交
50
	InWorkConn(net.Conn, *msg.StartWorkConn)
F
fatedier 已提交
51

F
fatedier 已提交
52 53 54
	Close()
}

F
fatedier 已提交
55
func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
F
fatedier 已提交
56
	var limiter *rate.Limiter
F
fatedier 已提交
57
	limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
F
fatedier 已提交
58 59 60 61
	if limitBytes > 0 {
		limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
	}

F
fatedier 已提交
62
	baseProxy := BaseProxy{
63 64
		clientCfg:     clientCfg,
		serverUDPPort: serverUDPPort,
F
fatedier 已提交
65
		limiter:       limiter,
F
fatedier 已提交
66 67
		xl:            xlog.FromContextSafe(ctx),
		ctx:           ctx,
F
fatedier 已提交
68
	}
F
fatedier 已提交
69 70 71
	switch cfg := pxyConf.(type) {
	case *config.TcpProxyConf:
		pxy = &TcpProxy{
F
go vet  
fatedier 已提交
72
			BaseProxy: &baseProxy,
F
fatedier 已提交
73
			cfg:       cfg,
F
fatedier 已提交
74 75 76
		}
	case *config.UdpProxyConf:
		pxy = &UdpProxy{
F
go vet  
fatedier 已提交
77
			BaseProxy: &baseProxy,
F
fatedier 已提交
78
			cfg:       cfg,
F
fatedier 已提交
79 80 81
		}
	case *config.HttpProxyConf:
		pxy = &HttpProxy{
F
go vet  
fatedier 已提交
82
			BaseProxy: &baseProxy,
F
fatedier 已提交
83
			cfg:       cfg,
F
fatedier 已提交
84 85 86
		}
	case *config.HttpsProxyConf:
		pxy = &HttpsProxy{
F
go vet  
fatedier 已提交
87
			BaseProxy: &baseProxy,
F
fatedier 已提交
88
			cfg:       cfg,
F
fatedier 已提交
89
		}
F
fatedier 已提交
90 91
	case *config.StcpProxyConf:
		pxy = &StcpProxy{
F
go vet  
fatedier 已提交
92
			BaseProxy: &baseProxy,
F
fatedier 已提交
93 94
			cfg:       cfg,
		}
F
fatedier 已提交
95 96
	case *config.XtcpProxyConf:
		pxy = &XtcpProxy{
F
go vet  
fatedier 已提交
97
			BaseProxy: &baseProxy,
F
fatedier 已提交
98 99
			cfg:       cfg,
		}
F
fatedier 已提交
100 101 102 103
	}
	return
}

F
fatedier 已提交
104
type BaseProxy struct {
105 106 107
	closed        bool
	clientCfg     config.ClientCommonConf
	serverUDPPort int
F
fatedier 已提交
108
	limiter       *rate.Limiter
F
fatedier 已提交
109 110 111 112

	mu  sync.RWMutex
	xl  *xlog.Logger
	ctx context.Context
F
fatedier 已提交
113 114
}

F
fatedier 已提交
115 116
// TCP
type TcpProxy struct {
F
go vet  
fatedier 已提交
117
	*BaseProxy
F
fatedier 已提交
118

119 120
	cfg         *config.TcpProxyConf
	proxyPlugin plugin.Plugin
F
fatedier 已提交
121 122
}

F
fatedier 已提交
123
func (pxy *TcpProxy) Run() (err error) {
124 125 126 127 128 129
	if pxy.cfg.Plugin != "" {
		pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
		if err != nil {
			return
		}
	}
F
fatedier 已提交
130
	return
F
fatedier 已提交
131 132 133
}

func (pxy *TcpProxy) Close() {
134 135 136
	if pxy.proxyPlugin != nil {
		pxy.proxyPlugin.Close()
	}
F
fatedier 已提交
137 138
}

F
fatedier 已提交
139
func (pxy *TcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
F
fatedier 已提交
140 141
	HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
		conn, []byte(pxy.clientCfg.Token), m)
F
fatedier 已提交
142 143 144 145
}

// HTTP
type HttpProxy struct {
F
go vet  
fatedier 已提交
146
	*BaseProxy
F
fatedier 已提交
147

148 149
	cfg         *config.HttpProxyConf
	proxyPlugin plugin.Plugin
F
fatedier 已提交
150 151
}

F
fatedier 已提交
152
func (pxy *HttpProxy) Run() (err error) {
153 154 155 156 157 158
	if pxy.cfg.Plugin != "" {
		pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
		if err != nil {
			return
		}
	}
F
fatedier 已提交
159
	return
F
fatedier 已提交
160 161 162
}

func (pxy *HttpProxy) Close() {
163 164 165
	if pxy.proxyPlugin != nil {
		pxy.proxyPlugin.Close()
	}
F
fatedier 已提交
166 167
}

F
fatedier 已提交
168
func (pxy *HttpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
F
fatedier 已提交
169 170
	HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
		conn, []byte(pxy.clientCfg.Token), m)
F
fatedier 已提交
171 172 173 174
}

// HTTPS
type HttpsProxy struct {
F
go vet  
fatedier 已提交
175
	*BaseProxy
F
fatedier 已提交
176

177 178
	cfg         *config.HttpsProxyConf
	proxyPlugin plugin.Plugin
F
fatedier 已提交
179 180
}

F
fatedier 已提交
181
func (pxy *HttpsProxy) Run() (err error) {
182 183 184 185 186 187
	if pxy.cfg.Plugin != "" {
		pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
		if err != nil {
			return
		}
	}
F
fatedier 已提交
188
	return
F
fatedier 已提交
189 190 191
}

func (pxy *HttpsProxy) Close() {
192 193 194
	if pxy.proxyPlugin != nil {
		pxy.proxyPlugin.Close()
	}
F
fatedier 已提交
195 196
}

F
fatedier 已提交
197
func (pxy *HttpsProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
F
fatedier 已提交
198 199
	HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
		conn, []byte(pxy.clientCfg.Token), m)
F
fatedier 已提交
200 201
}

F
fatedier 已提交
202 203
// STCP
type StcpProxy struct {
F
go vet  
fatedier 已提交
204
	*BaseProxy
F
fatedier 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225

	cfg         *config.StcpProxyConf
	proxyPlugin plugin.Plugin
}

func (pxy *StcpProxy) Run() (err error) {
	if pxy.cfg.Plugin != "" {
		pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
		if err != nil {
			return
		}
	}
	return
}

func (pxy *StcpProxy) Close() {
	if pxy.proxyPlugin != nil {
		pxy.proxyPlugin.Close()
	}
}

F
fatedier 已提交
226
func (pxy *StcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
F
fatedier 已提交
227 228
	HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
		conn, []byte(pxy.clientCfg.Token), m)
F
fatedier 已提交
229 230
}

F
fatedier 已提交
231 232
// XTCP
type XtcpProxy struct {
F
go vet  
fatedier 已提交
233
	*BaseProxy
F
fatedier 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

	cfg         *config.XtcpProxyConf
	proxyPlugin plugin.Plugin
}

func (pxy *XtcpProxy) Run() (err error) {
	if pxy.cfg.Plugin != "" {
		pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
		if err != nil {
			return
		}
	}
	return
}

func (pxy *XtcpProxy) Close() {
	if pxy.proxyPlugin != nil {
		pxy.proxyPlugin.Close()
	}
}

F
fatedier 已提交
255 256
func (pxy *XtcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
	xl := pxy.xl
F
fatedier 已提交
257 258 259 260
	defer conn.Close()
	var natHoleSidMsg msg.NatHoleSid
	err := msg.ReadMsgInto(conn, &natHoleSidMsg)
	if err != nil {
F
fatedier 已提交
261
		xl.Error("xtcp read from workConn error: %v", err)
F
fatedier 已提交
262 263 264 265 266 267 268 269
		return
	}

	natHoleClientMsg := &msg.NatHoleClient{
		ProxyName: pxy.cfg.ProxyName,
		Sid:       natHoleSidMsg.Sid,
	}
	raddr, _ := net.ResolveUDPAddr("udp",
270
		fmt.Sprintf("%s:%d", pxy.clientCfg.ServerAddr, pxy.serverUDPPort))
F
fatedier 已提交
271 272 273 274 275
	clientConn, err := net.DialUDP("udp", nil, raddr)
	defer clientConn.Close()

	err = msg.WriteMsg(clientConn, natHoleClientMsg)
	if err != nil {
F
fatedier 已提交
276
		xl.Error("send natHoleClientMsg to server error: %v", err)
F
fatedier 已提交
277 278 279
		return
	}

F
fatedier 已提交
280
	// Wait for client address at most 5 seconds.
F
fatedier 已提交
281
	var natHoleRespMsg msg.NatHoleResp
F
fatedier 已提交
282 283 284 285 286
	clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))

	buf := pool.GetBuf(1024)
	n, err := clientConn.Read(buf)
	if err != nil {
F
fatedier 已提交
287
		xl.Error("get natHoleRespMsg error: %v", err)
F
fatedier 已提交
288 289 290
		return
	}
	err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
F
fatedier 已提交
291
	if err != nil {
F
fatedier 已提交
292
		xl.Error("get natHoleRespMsg error: %v", err)
F
fatedier 已提交
293 294 295 296
		return
	}
	clientConn.SetReadDeadline(time.Time{})
	clientConn.Close()
297 298

	if natHoleRespMsg.Error != "" {
F
fatedier 已提交
299
		xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
300 301 302
		return
	}

F
fatedier 已提交
303
	xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
F
fatedier 已提交
304

F
fatedier 已提交
305 306 307
	// Send detect message
	array := strings.Split(natHoleRespMsg.VisitorAddr, ":")
	if len(array) <= 1 {
F
fatedier 已提交
308
		xl.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
F
fatedier 已提交
309
	}
F
fatedier 已提交
310
	laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
F
fatedier 已提交
311 312 313 314 315 316 317
	/*
		for i := 1000; i < 65000; i++ {
			pxy.sendDetectMsg(array[0], int64(i), laddr, "a")
		}
	*/
	port, err := strconv.ParseInt(array[1], 10, 64)
	if err != nil {
F
fatedier 已提交
318
		xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
F
fatedier 已提交
319 320 321
		return
	}
	pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
F
fatedier 已提交
322
	xl.Trace("send all detect msg done")
F
fatedier 已提交
323 324 325 326 327

	msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})

	// Listen for clientConn's address and wait for visitor connection
	lConn, err := net.ListenUDP("udp", laddr)
F
fatedier 已提交
328
	if err != nil {
F
fatedier 已提交
329
		xl.Error("listen on visitorConn's local adress error: %v", err)
F
fatedier 已提交
330 331
		return
	}
F
fatedier 已提交
332
	defer lConn.Close()
F
fatedier 已提交
333

F
fatedier 已提交
334 335 336 337
	lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
	sidBuf := pool.GetBuf(1024)
	var uAddr *net.UDPAddr
	n, uAddr, err = lConn.ReadFromUDP(sidBuf)
F
fatedier 已提交
338
	if err != nil {
F
fatedier 已提交
339
		xl.Warn("get sid from visitor error: %v", err)
F
fatedier 已提交
340 341
		return
	}
F
fatedier 已提交
342 343
	lConn.SetReadDeadline(time.Time{})
	if string(sidBuf[:n]) != natHoleRespMsg.Sid {
F
fatedier 已提交
344
		xl.Warn("incorrect sid from visitor")
F
fatedier 已提交
345 346 347
		return
	}
	pool.PutBuf(sidBuf)
F
fatedier 已提交
348
	xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
F
fatedier 已提交
349 350

	lConn.WriteToUDP(sidBuf[:n], uAddr)
F
fatedier 已提交
351

F
fatedier 已提交
352
	kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, false, natHoleRespMsg.VisitorAddr)
F
fatedier 已提交
353
	if err != nil {
F
fatedier 已提交
354
		xl.Error("create kcp connection from udp connection error: %v", err)
F
fatedier 已提交
355 356 357
		return
	}

F
fatedier 已提交
358 359 360 361 362
	fmuxCfg := fmux.DefaultConfig()
	fmuxCfg.KeepAliveInterval = 5 * time.Second
	fmuxCfg.LogOutput = ioutil.Discard
	sess, err := fmux.Server(kcpConn, fmuxCfg)
	if err != nil {
F
fatedier 已提交
363
		xl.Error("create yamux server from kcp connection error: %v", err)
F
fatedier 已提交
364 365 366 367 368
		return
	}
	defer sess.Close()
	muxConn, err := sess.Accept()
	if err != nil {
F
fatedier 已提交
369
		xl.Error("accept for yamux connection error: %v", err)
F
fatedier 已提交
370 371 372
		return
	}

F
fatedier 已提交
373
	HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
F
fatedier 已提交
374
		muxConn, []byte(pxy.cfg.Sk), m)
F
fatedier 已提交
375 376
}

F
fatedier 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
	daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))
	if err != nil {
		return err
	}

	tConn, err := net.DialUDP("udp", laddr, daddr)
	if err != nil {
		return err
	}

	//uConn := ipv4.NewConn(tConn)
	//uConn.SetTTL(3)

	tConn.Write(content)
	tConn.Close()
	return nil
}

F
fatedier 已提交
396 397
// UDP
type UdpProxy struct {
F
go vet  
fatedier 已提交
398
	*BaseProxy
F
fatedier 已提交
399

F
fatedier 已提交
400
	cfg *config.UdpProxyConf
F
fatedier 已提交
401 402 403

	localAddr *net.UDPAddr
	readCh    chan *msg.UdpPacket
404 405 406

	// include msg.UdpPacket and msg.Ping
	sendCh   chan msg.Message
F
fatedier 已提交
407
	workConn net.Conn
F
fatedier 已提交
408 409
}

F
fatedier 已提交
410
func (pxy *UdpProxy) Run() (err error) {
F
fatedier 已提交
411 412 413 414
	pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
	if err != nil {
		return
	}
F
fatedier 已提交
415
	return
F
fatedier 已提交
416 417 418
}

func (pxy *UdpProxy) Close() {
F
fatedier 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
	pxy.mu.Lock()
	defer pxy.mu.Unlock()

	if !pxy.closed {
		pxy.closed = true
		if pxy.workConn != nil {
			pxy.workConn.Close()
		}
		if pxy.readCh != nil {
			close(pxy.readCh)
		}
		if pxy.sendCh != nil {
			close(pxy.sendCh)
		}
	}
F
fatedier 已提交
434 435
}

F
fatedier 已提交
436 437 438
func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
	xl := pxy.xl
	xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
F
fatedier 已提交
439 440 441
	// close resources releated with old workConn
	pxy.Close()

F
fatedier 已提交
442 443 444 445 446 447 448
	if pxy.limiter != nil {
		rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
			return conn.Close()
		})
		conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
	}

F
fatedier 已提交
449
	pxy.mu.Lock()
F
fatedier 已提交
450
	pxy.workConn = conn
451 452
	pxy.readCh = make(chan *msg.UdpPacket, 1024)
	pxy.sendCh = make(chan msg.Message, 1024)
F
fatedier 已提交
453 454
	pxy.closed = false
	pxy.mu.Unlock()
F
fatedier 已提交
455

456
	workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
F
fatedier 已提交
457 458 459
		for {
			var udpMsg msg.UdpPacket
			if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
F
fatedier 已提交
460
				xl.Warn("read from workConn for udp error: %v", errRet)
F
fatedier 已提交
461 462 463
				return
			}
			if errRet := errors.PanicToError(func() {
F
fatedier 已提交
464
				xl.Trace("get udp package from workConn: %s", udpMsg.Content)
465
				readCh <- &udpMsg
F
fatedier 已提交
466
			}); errRet != nil {
F
fatedier 已提交
467
				xl.Info("reader goroutine for udp work connection closed: %v", errRet)
F
fatedier 已提交
468 469 470 471
				return
			}
		}
	}
472 473
	workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
		defer func() {
F
fatedier 已提交
474
			xl.Info("writer goroutine for udp work connection closed")
475
		}()
F
fatedier 已提交
476
		var errRet error
477 478 479
		for rawMsg := range sendCh {
			switch m := rawMsg.(type) {
			case *msg.UdpPacket:
F
fatedier 已提交
480
				xl.Trace("send udp package to workConn: %s", m.Content)
481
			case *msg.Ping:
F
fatedier 已提交
482
				xl.Trace("send ping message to udp workConn")
483 484
			}
			if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
F
fatedier 已提交
485
				xl.Error("udp work write error: %v", errRet)
F
fatedier 已提交
486 487 488 489
				return
			}
		}
	}
490 491 492 493 494 495 496
	heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
		var errRet error
		for {
			time.Sleep(time.Duration(30) * time.Second)
			if errRet = errors.PanicToError(func() {
				sendCh <- &msg.Ping{}
			}); errRet != nil {
F
fatedier 已提交
497
				xl.Trace("heartbeat goroutine for udp work connection closed")
F
update  
fatedier 已提交
498
				break
499 500 501
			}
		}
	}
F
fatedier 已提交
502

503 504 505
	go workConnSenderFn(pxy.workConn, pxy.sendCh)
	go workConnReaderFn(pxy.workConn, pxy.readCh)
	go heartbeatFn(pxy.workConn, pxy.sendCh)
F
fatedier 已提交
506
	udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
F
fatedier 已提交
507 508 509
}

// Common handler for tcp work connections.
F
fatedier 已提交
510
func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
F
fatedier 已提交
511
	baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
F
fatedier 已提交
512
	xl := xlog.FromContextSafe(ctx)
513 514 515 516
	var (
		remote io.ReadWriteCloser
		err    error
	)
F
fatedier 已提交
517
	remote = workConn
F
fatedier 已提交
518 519 520 521 522
	if limiter != nil {
		remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error {
			return workConn.Close()
		})
	}
F
fatedier 已提交
523

F
fatedier 已提交
524 525
	xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
		baseInfo.UseEncryption, baseInfo.UseCompression)
F
fatedier 已提交
526
	if baseInfo.UseEncryption {
F
fatedier 已提交
527
		remote, err = frpIo.WithEncryption(remote, encKey)
F
fatedier 已提交
528
		if err != nil {
F
fatedier 已提交
529
			workConn.Close()
F
fatedier 已提交
530
			xl.Error("create encryption stream error: %v", err)
F
fatedier 已提交
531 532 533 534
			return
		}
	}
	if baseInfo.UseCompression {
F
fatedier 已提交
535
		remote = frpIo.WithCompression(remote)
F
fatedier 已提交
536
	}
537

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
	// check if we need to send proxy protocol info
	var extraInfo []byte
	if baseInfo.ProxyProtocolVersion != "" {
		if m.SrcAddr != "" && m.SrcPort != 0 {
			if m.DstAddr == "" {
				m.DstAddr = "127.0.0.1"
			}
			h := &pp.Header{
				Command:            pp.PROXY,
				SourceAddress:      net.ParseIP(m.SrcAddr),
				SourcePort:         m.SrcPort,
				DestinationAddress: net.ParseIP(m.DstAddr),
				DestinationPort:    m.DstPort,
			}

553
			if strings.Contains(m.SrcAddr, ".") {
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
				h.TransportProtocol = pp.TCPv4
			} else {
				h.TransportProtocol = pp.TCPv6
			}

			if baseInfo.ProxyProtocolVersion == "v1" {
				h.Version = 1
			} else if baseInfo.ProxyProtocolVersion == "v2" {
				h.Version = 2
			}

			buf := bytes.NewBuffer(nil)
			h.WriteTo(buf)
			extraInfo = buf.Bytes()
		}
	}

571 572
	if proxyPlugin != nil {
		// if plugin is set, let plugin handle connections first
F
fatedier 已提交
573
		xl.Debug("handle by plugin: %s", proxyPlugin.Name())
574
		proxyPlugin.Handle(remote, workConn, extraInfo)
F
fatedier 已提交
575
		xl.Debug("handle by plugin finished")
576 577
		return
	} else {
F
fatedier 已提交
578
		localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
579
		if err != nil {
F
fatedier 已提交
580
			workConn.Close()
F
fatedier 已提交
581
			xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
582 583 584
			return
		}

F
fatedier 已提交
585
		xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
586
			localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
F
fatedier 已提交
587

588 589
		if len(extraInfo) > 0 {
			localConn.Write(extraInfo)
F
fatedier 已提交
590 591
		}

F
fatedier 已提交
592
		frpIo.Join(localConn, remote)
F
fatedier 已提交
593
		xl.Debug("join connections closed")
594
	}
F
fatedier 已提交
595
}