control.go 13.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fatedier 已提交
15 16 17
package server

import (
F
fatedier 已提交
18
	"context"
F
fatedier 已提交
19 20
	"fmt"
	"io"
F
fatedier 已提交
21
	"net"
F
fatedier 已提交
22
	"runtime/debug"
F
fatedier 已提交
23 24 25
	"sync"
	"time"

F
fatedier 已提交
26 27 28 29 30 31 32 33 34
	"github.com/fatedier/frp/pkg/auth"
	"github.com/fatedier/frp/pkg/config"
	"github.com/fatedier/frp/pkg/consts"
	frpErr "github.com/fatedier/frp/pkg/errors"
	"github.com/fatedier/frp/pkg/msg"
	plugin "github.com/fatedier/frp/pkg/plugin/server"
	"github.com/fatedier/frp/pkg/util/util"
	"github.com/fatedier/frp/pkg/util/version"
	"github.com/fatedier/frp/pkg/util/xlog"
F
fatedier 已提交
35
	"github.com/fatedier/frp/server/controller"
36
	"github.com/fatedier/frp/server/metrics"
F
fatedier 已提交
37
	"github.com/fatedier/frp/server/proxy"
F
fatedier 已提交
38

F
fatedier 已提交
39
	"github.com/fatedier/golib/control/shutdown"
F
fatedier 已提交
40 41
	"github.com/fatedier/golib/crypto"
	"github.com/fatedier/golib/errors"
F
fatedier 已提交
42 43
)

F
fatedier 已提交
44 45
type ControlManager struct {
	// controls indexed by run id
F
fatedier 已提交
46
	ctlsByRunID map[string]*Control
F
fatedier 已提交
47 48 49 50 51 52

	mu sync.RWMutex
}

func NewControlManager() *ControlManager {
	return &ControlManager{
F
fatedier 已提交
53
		ctlsByRunID: make(map[string]*Control),
F
fatedier 已提交
54 55 56
	}
}

F
fatedier 已提交
57
func (cm *ControlManager) Add(runID string, ctl *Control) (oldCtl *Control) {
F
fatedier 已提交
58 59 60
	cm.mu.Lock()
	defer cm.mu.Unlock()

F
fatedier 已提交
61
	oldCtl, ok := cm.ctlsByRunID[runID]
F
fatedier 已提交
62 63 64
	if ok {
		oldCtl.Replaced(ctl)
	}
F
fatedier 已提交
65
	cm.ctlsByRunID[runID] = ctl
F
fatedier 已提交
66 67 68
	return
}

F
fatedier 已提交
69
// we should make sure if it's the same control to prevent delete a new one
F
fatedier 已提交
70
func (cm *ControlManager) Del(runID string, ctl *Control) {
F
fatedier 已提交
71 72
	cm.mu.Lock()
	defer cm.mu.Unlock()
F
fatedier 已提交
73 74
	if c, ok := cm.ctlsByRunID[runID]; ok && c == ctl {
		delete(cm.ctlsByRunID, runID)
F
fatedier 已提交
75
	}
F
fatedier 已提交
76 77
}

F
fatedier 已提交
78
func (cm *ControlManager) GetByID(runID string) (ctl *Control, ok bool) {
F
fatedier 已提交
79 80
	cm.mu.RLock()
	defer cm.mu.RUnlock()
F
fatedier 已提交
81
	ctl, ok = cm.ctlsByRunID[runID]
F
fatedier 已提交
82 83 84
	return
}

F
fatedier 已提交
85
type Control struct {
F
fatedier 已提交
86
	// all resource managers and controllers
F
fatedier 已提交
87 88 89
	rc *controller.ResourceController

	// proxy manager
F
fatedier 已提交
90
	pxyManager *proxy.Manager
F
fatedier 已提交
91

F
fatedier 已提交
92 93 94
	// plugin manager
	pluginManager *plugin.Manager

95 96 97
	// verifies authentication based on selected method
	authVerifier auth.Verifier

F
fatedier 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
	// login message
	loginMsg *msg.Login

	// control connection
	conn net.Conn

	// put a message in this channel to send it over control connection to client
	sendCh chan (msg.Message)

	// read from this channel to get the next message sent by client
	readCh chan (msg.Message)

	// work connections
	workConnCh chan net.Conn

	// proxies in one client
F
fatedier 已提交
114
	proxies map[string]proxy.Proxy
F
fatedier 已提交
115 116 117 118

	// pool count
	poolCount int

F
fatedier 已提交
119 120 121
	// ports used, for limitations
	portsUsedNum int

F
fatedier 已提交
122 123 124 125 126 127
	// last time got the Ping message
	lastPing time.Time

	// A new run id will be generated when a new client login.
	// If run id got from login message has same run id, it means it's the same client, so we can
	// replace old controller instantly.
F
fatedier 已提交
128
	runID string
F
fatedier 已提交
129 130 131 132 133 134 135 136 137 138

	// control status
	status string

	readerShutdown  *shutdown.Shutdown
	writerShutdown  *shutdown.Shutdown
	managerShutdown *shutdown.Shutdown
	allShutdown     *shutdown.Shutdown

	mu sync.RWMutex
139 140 141

	// Server configuration information
	serverCfg config.ServerCommonConf
F
fatedier 已提交
142 143 144

	xl  *xlog.Logger
	ctx context.Context
F
fatedier 已提交
145 146
}

F
fatedier 已提交
147 148 149
func NewControl(
	ctx context.Context,
	rc *controller.ResourceController,
F
fatedier 已提交
150
	pxyManager *proxy.Manager,
F
fatedier 已提交
151
	pluginManager *plugin.Manager,
152
	authVerifier auth.Verifier,
F
fatedier 已提交
153 154 155 156
	ctlConn net.Conn,
	loginMsg *msg.Login,
	serverCfg config.ServerCommonConf,
) *Control {
F
fatedier 已提交
157

F
fatedier 已提交
158 159 160 161
	poolCount := loginMsg.PoolCount
	if poolCount > int(serverCfg.MaxPoolCount) {
		poolCount = int(serverCfg.MaxPoolCount)
	}
F
fatedier 已提交
162
	return &Control{
F
fatedier 已提交
163
		rc:              rc,
F
fatedier 已提交
164
		pxyManager:      pxyManager,
F
fatedier 已提交
165
		pluginManager:   pluginManager,
166
		authVerifier:    authVerifier,
F
fatedier 已提交
167 168 169 170
		conn:            ctlConn,
		loginMsg:        loginMsg,
		sendCh:          make(chan msg.Message, 10),
		readCh:          make(chan msg.Message, 10),
F
fatedier 已提交
171
		workConnCh:      make(chan net.Conn, poolCount+10),
F
fatedier 已提交
172
		proxies:         make(map[string]proxy.Proxy),
F
fatedier 已提交
173
		poolCount:       poolCount,
F
fatedier 已提交
174
		portsUsedNum:    0,
F
fatedier 已提交
175
		lastPing:        time.Now(),
F
fatedier 已提交
176
		runID:           loginMsg.RunID,
F
fatedier 已提交
177 178 179 180 181
		status:          consts.Working,
		readerShutdown:  shutdown.New(),
		writerShutdown:  shutdown.New(),
		managerShutdown: shutdown.New(),
		allShutdown:     shutdown.New(),
182
		serverCfg:       serverCfg,
F
fatedier 已提交
183 184
		xl:              xlog.FromContextSafe(ctx),
		ctx:             ctx,
F
fatedier 已提交
185 186 187 188 189
	}
}

// Start send a login success message to client and start working.
func (ctl *Control) Start() {
F
fatedier 已提交
190
	loginRespMsg := &msg.LoginResp{
F
fatedier 已提交
191
		Version:       version.Full(),
F
fatedier 已提交
192 193
		RunID:         ctl.runID,
		ServerUDPPort: ctl.serverCfg.BindUDPPort,
F
fatedier 已提交
194
		Error:         "",
F
fatedier 已提交
195
	}
F
fatedier 已提交
196
	msg.WriteMsg(ctl.conn, loginRespMsg)
F
fatedier 已提交
197

F
fatedier 已提交
198
	go ctl.writer()
F
fatedier 已提交
199 200 201 202 203 204 205 206 207
	for i := 0; i < ctl.poolCount; i++ {
		ctl.sendCh <- &msg.ReqWorkConn{}
	}

	go ctl.manager()
	go ctl.reader()
	go ctl.stoper()
}

208
func (ctl *Control) RegisterWorkConn(conn net.Conn) error {
F
fatedier 已提交
209
	xl := ctl.xl
F
fatedier 已提交
210 211
	defer func() {
		if err := recover(); err != nil {
F
fatedier 已提交
212 213
			xl.Error("panic error: %v", err)
			xl.Error(string(debug.Stack()))
F
fatedier 已提交
214 215 216 217 218
		}
	}()

	select {
	case ctl.workConnCh <- conn:
F
fatedier 已提交
219
		xl.Debug("new work connection registered")
220
		return nil
F
fatedier 已提交
221
	default:
F
fatedier 已提交
222
		xl.Debug("work connection pool is full, discarding")
223
		return fmt.Errorf("work connection pool is full, discarding")
F
fatedier 已提交
224 225 226 227 228 229 230 231
	}
}

// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.
// return an error if wait timeout
func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
F
fatedier 已提交
232
	xl := ctl.xl
F
fatedier 已提交
233 234
	defer func() {
		if err := recover(); err != nil {
F
fatedier 已提交
235 236
			xl.Error("panic error: %v", err)
			xl.Error(string(debug.Stack()))
F
fatedier 已提交
237 238 239 240 241 242 243 244
		}
	}()

	var ok bool
	// get a work connection from the pool
	select {
	case workConn, ok = <-ctl.workConnCh:
		if !ok {
F
fatedier 已提交
245
			err = frpErr.ErrCtlClosed
F
fatedier 已提交
246 247
			return
		}
F
fatedier 已提交
248
		xl.Debug("get work connection from pool")
F
fatedier 已提交
249 250 251 252 253 254
	default:
		// no work connections available in the poll, send message to frpc to get more
		err = errors.PanicToError(func() {
			ctl.sendCh <- &msg.ReqWorkConn{}
		})
		if err != nil {
F
fatedier 已提交
255
			xl.Error("%v", err)
F
fatedier 已提交
256 257 258 259 260 261
			return
		}

		select {
		case workConn, ok = <-ctl.workConnCh:
			if !ok {
F
fatedier 已提交
262
				err = frpErr.ErrCtlClosed
F
fatedier 已提交
263
				xl.Warn("no work connections avaiable, %v", err)
F
fatedier 已提交
264 265 266
				return
			}

267
		case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
F
fatedier 已提交
268
			err = fmt.Errorf("timeout trying to get work connection")
F
fatedier 已提交
269
			xl.Warn("%v", err)
F
fatedier 已提交
270 271 272 273 274 275 276 277 278 279 280 281
			return
		}
	}

	// When we get a work connection from pool, replace it with a new one.
	errors.PanicToError(func() {
		ctl.sendCh <- &msg.ReqWorkConn{}
	})
	return
}

func (ctl *Control) Replaced(newCtl *Control) {
F
fatedier 已提交
282
	xl := ctl.xl
F
fatedier 已提交
283 284
	xl.Info("Replaced by client [%s]", newCtl.runID)
	ctl.runID = ""
F
fatedier 已提交
285 286 287 288
	ctl.allShutdown.Start()
}

func (ctl *Control) writer() {
F
fatedier 已提交
289
	xl := ctl.xl
F
fatedier 已提交
290 291
	defer func() {
		if err := recover(); err != nil {
F
fatedier 已提交
292 293
			xl.Error("panic error: %v", err)
			xl.Error(string(debug.Stack()))
F
fatedier 已提交
294 295 296 297 298 299
		}
	}()

	defer ctl.allShutdown.Start()
	defer ctl.writerShutdown.Done()

300
	encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.serverCfg.Token))
F
fatedier 已提交
301
	if err != nil {
F
fatedier 已提交
302
		xl.Error("crypto new writer error: %v", err)
F
fatedier 已提交
303 304 305
		ctl.allShutdown.Start()
		return
	}
F
fatedier 已提交
306
	for {
F
fatedier 已提交
307 308
		m, ok := <-ctl.sendCh
		if !ok {
F
fatedier 已提交
309
			xl.Info("control writer is closing")
F
fatedier 已提交
310
			return
F
fatedier 已提交
311 312 313 314 315
		}

		if err := msg.WriteMsg(encWriter, m); err != nil {
			xl.Warn("write message to control connection error: %v", err)
			return
F
fatedier 已提交
316 317 318 319 320
		}
	}
}

func (ctl *Control) reader() {
F
fatedier 已提交
321
	xl := ctl.xl
F
fatedier 已提交
322 323
	defer func() {
		if err := recover(); err != nil {
F
fatedier 已提交
324 325
			xl.Error("panic error: %v", err)
			xl.Error(string(debug.Stack()))
F
fatedier 已提交
326 327 328 329 330 331
		}
	}()

	defer ctl.allShutdown.Start()
	defer ctl.readerShutdown.Done()

332
	encReader := crypto.NewReader(ctl.conn, []byte(ctl.serverCfg.Token))
F
fatedier 已提交
333
	for {
F
fatedier 已提交
334 335
		m, err := msg.ReadMsg(encReader)
		if err != nil {
F
fatedier 已提交
336
			if err == io.EOF {
F
fatedier 已提交
337
				xl.Debug("control connection closed")
F
fatedier 已提交
338 339
				return
			}
F
fatedier 已提交
340 341 342
			xl.Warn("read error: %v", err)
			ctl.conn.Close()
			return
F
fatedier 已提交
343
		}
F
fatedier 已提交
344 345

		ctl.readCh <- m
F
fatedier 已提交
346 347 348 349
	}
}

func (ctl *Control) stoper() {
F
fatedier 已提交
350
	xl := ctl.xl
F
fatedier 已提交
351 352
	defer func() {
		if err := recover(); err != nil {
F
fatedier 已提交
353 354
			xl.Error("panic error: %v", err)
			xl.Error(string(debug.Stack()))
F
fatedier 已提交
355 356 357 358 359 360
		}
	}()

	ctl.allShutdown.WaitStart()

	close(ctl.readCh)
F
fatedier 已提交
361
	ctl.managerShutdown.WaitDone()
F
fatedier 已提交
362 363

	close(ctl.sendCh)
F
fatedier 已提交
364
	ctl.writerShutdown.WaitDone()
F
fatedier 已提交
365 366

	ctl.conn.Close()
F
fatedier 已提交
367
	ctl.readerShutdown.WaitDone()
F
fatedier 已提交
368

F
fatedier 已提交
369 370 371
	ctl.mu.Lock()
	defer ctl.mu.Unlock()

F
fatedier 已提交
372 373 374 375 376 377 378
	close(ctl.workConnCh)
	for workConn := range ctl.workConnCh {
		workConn.Close()
	}

	for _, pxy := range ctl.proxies {
		pxy.Close()
F
fatedier 已提交
379
		ctl.pxyManager.Del(pxy.GetName())
380
		metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
F
fatedier 已提交
381 382 383
	}

	ctl.allShutdown.Done()
F
fatedier 已提交
384
	xl.Info("client exit success")
385
	metrics.Server.CloseClient()
F
fatedier 已提交
386 387 388 389 390
}

// block until Control closed
func (ctl *Control) WaitClosed() {
	ctl.allShutdown.WaitDone()
F
fatedier 已提交
391 392 393
}

func (ctl *Control) manager() {
F
fatedier 已提交
394
	xl := ctl.xl
F
fatedier 已提交
395 396
	defer func() {
		if err := recover(); err != nil {
F
fatedier 已提交
397 398
			xl.Error("panic error: %v", err)
			xl.Error(string(debug.Stack()))
F
fatedier 已提交
399 400 401 402 403 404 405 406 407 408 409 410
		}
	}()

	defer ctl.allShutdown.Start()
	defer ctl.managerShutdown.Done()

	heartbeat := time.NewTicker(time.Second)
	defer heartbeat.Stop()

	for {
		select {
		case <-heartbeat.C:
411
			if time.Since(ctl.lastPing) > time.Duration(ctl.serverCfg.HeartBeatTimeout)*time.Second {
F
fatedier 已提交
412
				xl.Warn("heartbeat timeout")
F
fatedier 已提交
413
				return
F
fatedier 已提交
414 415 416 417 418 419 420 421
			}
		case rawMsg, ok := <-ctl.readCh:
			if !ok {
				return
			}

			switch m := rawMsg.(type) {
			case *msg.NewProxy:
F
fatedier 已提交
422 423 424 425
				content := &plugin.NewProxyContent{
					User: plugin.UserInfo{
						User:  ctl.loginMsg.User,
						Metas: ctl.loginMsg.Metas,
F
fatedier 已提交
426
						RunID: ctl.loginMsg.RunID,
F
fatedier 已提交
427 428 429 430 431 432 433 434 435 436
					},
					NewProxy: *m,
				}
				var remoteAddr string
				retContent, err := ctl.pluginManager.NewProxy(content)
				if err == nil {
					m = &retContent.NewProxy
					remoteAddr, err = ctl.RegisterProxy(m)
				}

F
fatedier 已提交
437 438 439 440 441
				// register proxy in this control
				resp := &msg.NewProxyResp{
					ProxyName: m.ProxyName,
				}
				if err != nil {
F
fatedier 已提交
442
					xl.Warn("new proxy [%s] error: %v", m.ProxyName, err)
443
					resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", m.ProxyName), err, ctl.serverCfg.DetailedErrorsToClient)
F
fatedier 已提交
444
				} else {
445
					resp.RemoteAddr = remoteAddr
F
fatedier 已提交
446
					xl.Info("new proxy [%s] success", m.ProxyName)
447
					metrics.Server.NewProxy(m.ProxyName, m.ProxyType)
F
fatedier 已提交
448 449
				}
				ctl.sendCh <- resp
F
fatedier 已提交
450 451
			case *msg.CloseProxy:
				ctl.CloseProxy(m)
F
fatedier 已提交
452
				xl.Info("close proxy [%s] success", m.ProxyName)
F
fatedier 已提交
453
			case *msg.Ping:
454 455 456 457
				content := &plugin.PingContent{
					User: plugin.UserInfo{
						User:  ctl.loginMsg.User,
						Metas: ctl.loginMsg.Metas,
F
fatedier 已提交
458
						RunID: ctl.loginMsg.RunID,
459 460 461 462 463 464 465 466 467
					},
					Ping: *m,
				}
				retContent, err := ctl.pluginManager.Ping(content)
				if err == nil {
					m = &retContent.Ping
					err = ctl.authVerifier.VerifyPing(m)
				}
				if err != nil {
468 469
					xl.Warn("received invalid ping: %v", err)
					ctl.sendCh <- &msg.Pong{
470
						Error: util.GenerateResponseErrorString("invalid ping", err, ctl.serverCfg.DetailedErrorsToClient),
471 472 473
					}
					return
				}
F
fatedier 已提交
474
				ctl.lastPing = time.Now()
F
fatedier 已提交
475
				xl.Debug("receive heartbeat")
F
fatedier 已提交
476 477 478 479 480 481
				ctl.sendCh <- &msg.Pong{}
			}
		}
	}
}

482
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
F
fatedier 已提交
483 484
	var pxyConf config.ProxyConf
	// Load configures from NewProxy message and check.
485
	pxyConf, err = config.NewProxyConfFromMsg(pxyMsg, ctl.serverCfg)
F
fatedier 已提交
486
	if err != nil {
487
		return
F
fatedier 已提交
488 489
	}

490 491 492 493
	// User info
	userInfo := plugin.UserInfo{
		User:  ctl.loginMsg.User,
		Metas: ctl.loginMsg.Metas,
F
fatedier 已提交
494
		RunID: ctl.runID,
495 496
	}

F
fatedier 已提交
497 498
	// NewProxy will return a interface Proxy.
	// In fact it create different proxies by different proxy type, we just call run() here.
499
	pxy, err := proxy.NewProxy(ctl.ctx, userInfo, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg)
F
fatedier 已提交
500
	if err != nil {
501
		return remoteAddr, err
F
fatedier 已提交
502 503
	}

F
fatedier 已提交
504
	// Check ports used number in each client
505
	if ctl.serverCfg.MaxPortsPerClient > 0 {
F
fatedier 已提交
506
		ctl.mu.Lock()
507
		if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(ctl.serverCfg.MaxPortsPerClient) {
F
fatedier 已提交
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
			ctl.mu.Unlock()
			err = fmt.Errorf("exceed the max_ports_per_client")
			return
		}
		ctl.portsUsedNum = ctl.portsUsedNum + pxy.GetUsedPortsNum()
		ctl.mu.Unlock()

		defer func() {
			if err != nil {
				ctl.mu.Lock()
				ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
				ctl.mu.Unlock()
			}
		}()
	}

524
	remoteAddr, err = pxy.Run()
F
fatedier 已提交
525
	if err != nil {
526
		return
F
fatedier 已提交
527 528 529 530 531 532 533
	}
	defer func() {
		if err != nil {
			pxy.Close()
		}
	}()

F
fatedier 已提交
534
	err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy)
F
fatedier 已提交
535
	if err != nil {
536
		return
F
fatedier 已提交
537
	}
F
fatedier 已提交
538 539 540 541

	ctl.mu.Lock()
	ctl.proxies[pxy.GetName()] = pxy
	ctl.mu.Unlock()
542
	return
F
fatedier 已提交
543
}
F
fatedier 已提交
544 545 546 547 548

func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
	ctl.mu.Lock()
	pxy, ok := ctl.proxies[closeMsg.ProxyName]
	if !ok {
F
fatedier 已提交
549
		ctl.mu.Unlock()
F
fatedier 已提交
550 551 552
		return
	}

553
	if ctl.serverCfg.MaxPortsPerClient > 0 {
F
fatedier 已提交
554 555
		ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
	}
F
fatedier 已提交
556
	pxy.Close()
F
fatedier 已提交
557
	ctl.pxyManager.Del(pxy.GetName())
F
fatedier 已提交
558
	delete(ctl.proxies, closeMsg.ProxyName)
F
fatedier 已提交
559 560
	ctl.mu.Unlock()

561
	metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
F
fatedier 已提交
562 563
	return
}