control.go 9.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fatedier 已提交
15 16 17 18 19 20 21 22
package server

import (
	"fmt"
	"io"
	"sync"
	"time"

F
fatedier 已提交
23
	"github.com/fatedier/frp/g"
F
fatedier 已提交
24 25
	"github.com/fatedier/frp/models/config"
	"github.com/fatedier/frp/models/consts"
F
fatedier 已提交
26
	frpErr "github.com/fatedier/frp/models/errors"
F
fatedier 已提交
27 28 29
	"github.com/fatedier/frp/models/msg"
	"github.com/fatedier/frp/utils/net"
	"github.com/fatedier/frp/utils/version"
F
fatedier 已提交
30

F
fatedier 已提交
31
	"github.com/fatedier/golib/control/shutdown"
F
fatedier 已提交
32 33
	"github.com/fatedier/golib/crypto"
	"github.com/fatedier/golib/errors"
F
fatedier 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
)

type Control struct {
	// frps service
	svr *Service

	// login message
	loginMsg *msg.Login

	// control connection
	conn net.Conn

	// put a message in this channel to send it over control connection to client
	sendCh chan (msg.Message)

	// read from this channel to get the next message sent by client
	readCh chan (msg.Message)

	// work connections
	workConnCh chan net.Conn

	// proxies in one client
F
fatedier 已提交
56
	proxies map[string]Proxy
F
fatedier 已提交
57 58 59 60

	// pool count
	poolCount int

F
fatedier 已提交
61 62 63
	// ports used, for limitations
	portsUsedNum int

F
fatedier 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
	// last time got the Ping message
	lastPing time.Time

	// A new run id will be generated when a new client login.
	// If run id got from login message has same run id, it means it's the same client, so we can
	// replace old controller instantly.
	runId string

	// control status
	status string

	readerShutdown  *shutdown.Shutdown
	writerShutdown  *shutdown.Shutdown
	managerShutdown *shutdown.Shutdown
	allShutdown     *shutdown.Shutdown

	mu sync.RWMutex
}

func NewControl(svr *Service, ctlConn net.Conn, loginMsg *msg.Login) *Control {
	return &Control{
		svr:             svr,
		conn:            ctlConn,
		loginMsg:        loginMsg,
		sendCh:          make(chan msg.Message, 10),
		readCh:          make(chan msg.Message, 10),
		workConnCh:      make(chan net.Conn, loginMsg.PoolCount+10),
F
fatedier 已提交
91
		proxies:         make(map[string]Proxy),
F
fatedier 已提交
92
		poolCount:       loginMsg.PoolCount,
F
fatedier 已提交
93
		portsUsedNum:    0,
F
fatedier 已提交
94 95 96 97 98 99 100 101 102 103 104 105
		lastPing:        time.Now(),
		runId:           loginMsg.RunId,
		status:          consts.Working,
		readerShutdown:  shutdown.New(),
		writerShutdown:  shutdown.New(),
		managerShutdown: shutdown.New(),
		allShutdown:     shutdown.New(),
	}
}

// Start send a login success message to client and start working.
func (ctl *Control) Start() {
F
fatedier 已提交
106
	loginRespMsg := &msg.LoginResp{
F
fatedier 已提交
107 108
		Version:       version.Full(),
		RunId:         ctl.runId,
F
fatedier 已提交
109
		ServerUdpPort: g.GlbServerCfg.BindUdpPort,
F
fatedier 已提交
110
		Error:         "",
F
fatedier 已提交
111
	}
F
fatedier 已提交
112
	msg.WriteMsg(ctl.conn, loginRespMsg)
F
fatedier 已提交
113

F
fatedier 已提交
114
	go ctl.writer()
F
fatedier 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
	for i := 0; i < ctl.poolCount; i++ {
		ctl.sendCh <- &msg.ReqWorkConn{}
	}

	go ctl.manager()
	go ctl.reader()
	go ctl.stoper()
}

func (ctl *Control) RegisterWorkConn(conn net.Conn) {
	defer func() {
		if err := recover(); err != nil {
			ctl.conn.Error("panic error: %v", err)
		}
	}()

	select {
	case ctl.workConnCh <- conn:
F
fatedier 已提交
133
		ctl.conn.Debug("new work connection registered")
F
fatedier 已提交
134
	default:
F
fatedier 已提交
135
		ctl.conn.Debug("work connection pool is full, discarding")
F
fatedier 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
		conn.Close()
	}
}

// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.
// return an error if wait timeout
func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
	defer func() {
		if err := recover(); err != nil {
			ctl.conn.Error("panic error: %v", err)
		}
	}()

	var ok bool
	// get a work connection from the pool
	select {
	case workConn, ok = <-ctl.workConnCh:
		if !ok {
F
fatedier 已提交
156
			err = frpErr.ErrCtlClosed
F
fatedier 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
			return
		}
		ctl.conn.Debug("get work connection from pool")
	default:
		// no work connections available in the poll, send message to frpc to get more
		err = errors.PanicToError(func() {
			ctl.sendCh <- &msg.ReqWorkConn{}
		})
		if err != nil {
			ctl.conn.Error("%v", err)
			return
		}

		select {
		case workConn, ok = <-ctl.workConnCh:
			if !ok {
F
fatedier 已提交
173
				err = frpErr.ErrCtlClosed
F
fatedier 已提交
174
				ctl.conn.Warn("no work connections avaiable, %v", err)
F
fatedier 已提交
175 176 177
				return
			}

F
fatedier 已提交
178
		case <-time.After(time.Duration(g.GlbServerCfg.UserConnTimeout) * time.Second):
F
fatedier 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
			err = fmt.Errorf("timeout trying to get work connection")
			ctl.conn.Warn("%v", err)
			return
		}
	}

	// When we get a work connection from pool, replace it with a new one.
	errors.PanicToError(func() {
		ctl.sendCh <- &msg.ReqWorkConn{}
	})
	return
}

func (ctl *Control) Replaced(newCtl *Control) {
	ctl.conn.Info("Replaced by client [%s]", newCtl.runId)
	ctl.runId = ""
	ctl.allShutdown.Start()
}

func (ctl *Control) writer() {
	defer func() {
		if err := recover(); err != nil {
			ctl.conn.Error("panic error: %v", err)
		}
	}()

	defer ctl.allShutdown.Start()
	defer ctl.writerShutdown.Done()

F
fatedier 已提交
208
	encWriter, err := crypto.NewWriter(ctl.conn, []byte(g.GlbServerCfg.Token))
F
fatedier 已提交
209 210 211 212 213
	if err != nil {
		ctl.conn.Error("crypto new writer error: %v", err)
		ctl.allShutdown.Start()
		return
	}
F
fatedier 已提交
214 215 216 217 218
	for {
		if m, ok := <-ctl.sendCh; !ok {
			ctl.conn.Info("control writer is closing")
			return
		} else {
F
fatedier 已提交
219
			if err := msg.WriteMsg(encWriter, m); err != nil {
F
fatedier 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
				ctl.conn.Warn("write message to control connection error: %v", err)
				return
			}
		}
	}
}

func (ctl *Control) reader() {
	defer func() {
		if err := recover(); err != nil {
			ctl.conn.Error("panic error: %v", err)
		}
	}()

	defer ctl.allShutdown.Start()
	defer ctl.readerShutdown.Done()

F
fatedier 已提交
237
	encReader := crypto.NewReader(ctl.conn, []byte(g.GlbServerCfg.Token))
F
fatedier 已提交
238
	for {
F
fatedier 已提交
239
		if m, err := msg.ReadMsg(encReader); err != nil {
F
fatedier 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
			if err == io.EOF {
				ctl.conn.Debug("control connection closed")
				return
			} else {
				ctl.conn.Warn("read error: %v", err)
				return
			}
		} else {
			ctl.readCh <- m
		}
	}
}

func (ctl *Control) stoper() {
	defer func() {
		if err := recover(); err != nil {
			ctl.conn.Error("panic error: %v", err)
		}
	}()

	ctl.allShutdown.WaitStart()

	close(ctl.readCh)
F
fatedier 已提交
263
	ctl.managerShutdown.WaitDone()
F
fatedier 已提交
264 265

	close(ctl.sendCh)
F
fatedier 已提交
266
	ctl.writerShutdown.WaitDone()
F
fatedier 已提交
267 268

	ctl.conn.Close()
F
fatedier 已提交
269
	ctl.readerShutdown.WaitDone()
F
fatedier 已提交
270

F
fatedier 已提交
271 272 273
	ctl.mu.Lock()
	defer ctl.mu.Unlock()

F
fatedier 已提交
274 275 276 277 278 279 280
	close(ctl.workConnCh)
	for workConn := range ctl.workConnCh {
		workConn.Close()
	}

	for _, pxy := range ctl.proxies {
		pxy.Close()
F
fatedier 已提交
281
		ctl.svr.DelProxy(pxy.GetName())
282
		StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
F
fatedier 已提交
283 284 285
	}

	ctl.allShutdown.Done()
F
fatedier 已提交
286
	ctl.conn.Info("client exit success")
287 288

	StatsCloseClient()
F
fatedier 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
}

func (ctl *Control) manager() {
	defer func() {
		if err := recover(); err != nil {
			ctl.conn.Error("panic error: %v", err)
		}
	}()

	defer ctl.allShutdown.Start()
	defer ctl.managerShutdown.Done()

	heartbeat := time.NewTicker(time.Second)
	defer heartbeat.Stop()

	for {
		select {
		case <-heartbeat.C:
F
fatedier 已提交
307
			if time.Since(ctl.lastPing) > time.Duration(g.GlbServerCfg.HeartBeatTimeout)*time.Second {
F
fatedier 已提交
308 309
				ctl.conn.Warn("heartbeat timeout")
				ctl.allShutdown.Start()
F
fatedier 已提交
310
				return
F
fatedier 已提交
311 312 313 314 315 316 317 318 319
			}
		case rawMsg, ok := <-ctl.readCh:
			if !ok {
				return
			}

			switch m := rawMsg.(type) {
			case *msg.NewProxy:
				// register proxy in this control
320
				remoteAddr, err := ctl.RegisterProxy(m)
F
fatedier 已提交
321 322 323 324 325 326 327
				resp := &msg.NewProxyResp{
					ProxyName: m.ProxyName,
				}
				if err != nil {
					resp.Error = err.Error()
					ctl.conn.Warn("new proxy [%s] error: %v", m.ProxyName, err)
				} else {
328
					resp.RemoteAddr = remoteAddr
F
fatedier 已提交
329
					ctl.conn.Info("new proxy [%s] success", m.ProxyName)
330
					StatsNewProxy(m.ProxyName, m.ProxyType)
F
fatedier 已提交
331 332
				}
				ctl.sendCh <- resp
F
fatedier 已提交
333 334 335
			case *msg.CloseProxy:
				ctl.CloseProxy(m)
				ctl.conn.Info("close proxy [%s] success", m.ProxyName)
F
fatedier 已提交
336 337
			case *msg.Ping:
				ctl.lastPing = time.Now()
F
fatedier 已提交
338
				ctl.conn.Debug("receive heartbeat")
F
fatedier 已提交
339 340 341 342 343 344
				ctl.sendCh <- &msg.Pong{}
			}
		}
	}
}

345
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
F
fatedier 已提交
346 347
	var pxyConf config.ProxyConf
	// Load configures from NewProxy message and check.
F
fatedier 已提交
348
	pxyConf, err = config.NewProxyConfFromMsg(pxyMsg)
F
fatedier 已提交
349
	if err != nil {
350
		return
F
fatedier 已提交
351 352 353 354 355 356
	}

	// NewProxy will return a interface Proxy.
	// In fact it create different proxies by different proxy type, we just call run() here.
	pxy, err := NewProxy(ctl, pxyConf)
	if err != nil {
357
		return remoteAddr, err
F
fatedier 已提交
358 359
	}

F
fatedier 已提交
360
	// Check ports used number in each client
F
fatedier 已提交
361
	if g.GlbServerCfg.MaxPortsPerClient > 0 {
F
fatedier 已提交
362
		ctl.mu.Lock()
F
fatedier 已提交
363
		if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(g.GlbServerCfg.MaxPortsPerClient) {
F
fatedier 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
			ctl.mu.Unlock()
			err = fmt.Errorf("exceed the max_ports_per_client")
			return
		}
		ctl.portsUsedNum = ctl.portsUsedNum + pxy.GetUsedPortsNum()
		ctl.mu.Unlock()

		defer func() {
			if err != nil {
				ctl.mu.Lock()
				ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
				ctl.mu.Unlock()
			}
		}()
	}

380
	remoteAddr, err = pxy.Run()
F
fatedier 已提交
381
	if err != nil {
382
		return
F
fatedier 已提交
383 384 385 386 387 388 389 390 391
	}
	defer func() {
		if err != nil {
			pxy.Close()
		}
	}()

	err = ctl.svr.RegisterProxy(pxyMsg.ProxyName, pxy)
	if err != nil {
392
		return
F
fatedier 已提交
393
	}
F
fatedier 已提交
394 395 396 397

	ctl.mu.Lock()
	ctl.proxies[pxy.GetName()] = pxy
	ctl.mu.Unlock()
398
	return
F
fatedier 已提交
399
}
F
fatedier 已提交
400 401 402 403 404 405

func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
	ctl.mu.Lock()

	pxy, ok := ctl.proxies[closeMsg.ProxyName]
	if !ok {
F
fatedier 已提交
406
		ctl.mu.Unlock()
F
fatedier 已提交
407 408 409
		return
	}

F
fatedier 已提交
410
	if g.GlbServerCfg.MaxPortsPerClient > 0 {
F
fatedier 已提交
411 412
		ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
	}
F
fatedier 已提交
413 414
	pxy.Close()
	ctl.svr.DelProxy(pxy.GetName())
F
fatedier 已提交
415
	delete(ctl.proxies, closeMsg.ProxyName)
F
fatedier 已提交
416 417
	ctl.mu.Unlock()

F
fatedier 已提交
418 419 420
	StatsCloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
	return
}