tcpmux.go 5.4 KB
Newer Older
G
Guy Lewin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// Copyright 2020 guylewin, guy@lewin.co.il
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package group

import (
	"context"
	"fmt"
	"net"
	"sync"

F
fatedier 已提交
23 24 25
	"github.com/fatedier/frp/pkg/consts"
	"github.com/fatedier/frp/pkg/util/tcpmux"
	"github.com/fatedier/frp/pkg/util/vhost"
G
Guy Lewin 已提交
26 27 28 29

	gerr "github.com/fatedier/golib/errors"
)

F
fatedier 已提交
30 31 32
// TCPMuxGroupCtl manage all TCPMuxGroups
type TCPMuxGroupCtl struct {
	groups map[string]*TCPMuxGroup
G
Guy Lewin 已提交
33 34

	// portManager is used to manage port
F
fatedier 已提交
35
	tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer
G
Guy Lewin 已提交
36 37 38
	mu                     sync.Mutex
}

F
fatedier 已提交
39 40 41 42 43
// NewTCPMuxGroupCtl return a new TCPMuxGroupCtl
func NewTCPMuxGroupCtl(tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer) *TCPMuxGroupCtl {
	return &TCPMuxGroupCtl{
		groups:                 make(map[string]*TCPMuxGroup),
		tcpMuxHTTPConnectMuxer: tcpMuxHTTPConnectMuxer,
G
Guy Lewin 已提交
44 45 46
	}
}

F
fatedier 已提交
47
// Listen is the wrapper for TCPMuxGroup's Listen
G
Guy Lewin 已提交
48
// If there are no group, we will create one here
F
fatedier 已提交
49 50 51
func (tmgc *TCPMuxGroupCtl) Listen(ctx context.Context, multiplexer string, group string, groupKey string,
	domain string) (l net.Listener, err error) {

G
Guy Lewin 已提交
52 53 54
	tmgc.mu.Lock()
	tcpMuxGroup, ok := tmgc.groups[group]
	if !ok {
F
fatedier 已提交
55
		tcpMuxGroup = NewTCPMuxGroup(tmgc)
G
Guy Lewin 已提交
56 57 58 59 60
		tmgc.groups[group] = tcpMuxGroup
	}
	tmgc.mu.Unlock()

	switch multiplexer {
F
fatedier 已提交
61 62
	case consts.HTTPConnectTCPMultiplexer:
		return tcpMuxGroup.HTTPConnectListen(ctx, group, groupKey, domain)
G
Guy Lewin 已提交
63 64 65 66 67 68
	default:
		err = fmt.Errorf("unknown multiplexer [%s]", multiplexer)
		return
	}
}

F
fatedier 已提交
69 70
// RemoveGroup remove TCPMuxGroup from controller
func (tmgc *TCPMuxGroupCtl) RemoveGroup(group string) {
G
Guy Lewin 已提交
71 72 73 74 75
	tmgc.mu.Lock()
	defer tmgc.mu.Unlock()
	delete(tmgc.groups, group)
}

F
fatedier 已提交
76 77
// TCPMuxGroup route connections to different proxies
type TCPMuxGroup struct {
G
Guy Lewin 已提交
78 79 80 81 82 83 84
	group    string
	groupKey string
	domain   string

	acceptCh chan net.Conn
	index    uint64
	tcpMuxLn net.Listener
F
fatedier 已提交
85 86
	lns      []*TCPMuxGroupListener
	ctl      *TCPMuxGroupCtl
G
Guy Lewin 已提交
87 88 89
	mu       sync.Mutex
}

F
fatedier 已提交
90 91 92 93
// NewTCPMuxGroup return a new TCPMuxGroup
func NewTCPMuxGroup(ctl *TCPMuxGroupCtl) *TCPMuxGroup {
	return &TCPMuxGroup{
		lns:      make([]*TCPMuxGroupListener, 0),
G
Guy Lewin 已提交
94 95 96 97 98
		ctl:      ctl,
		acceptCh: make(chan net.Conn),
	}
}

F
fatedier 已提交
99 100
// Listen will return a new TCPMuxGroupListener
// if TCPMuxGroup already has a listener, just add a new TCPMuxGroupListener to the queues
G
Guy Lewin 已提交
101
// otherwise, listen on the real address
F
fatedier 已提交
102
func (tmg *TCPMuxGroup) HTTPConnectListen(ctx context.Context, group string, groupKey string, domain string) (ln *TCPMuxGroupListener, err error) {
G
Guy Lewin 已提交
103 104 105 106
	tmg.mu.Lock()
	defer tmg.mu.Unlock()
	if len(tmg.lns) == 0 {
		// the first listener, listen on the real address
F
fatedier 已提交
107
		routeConfig := &vhost.RouteConfig{
G
Guy Lewin 已提交
108 109
			Domain: domain,
		}
F
fatedier 已提交
110
		tcpMuxLn, errRet := tmg.ctl.tcpMuxHTTPConnectMuxer.Listen(ctx, routeConfig)
G
Guy Lewin 已提交
111 112 113
		if errRet != nil {
			return nil, errRet
		}
F
fatedier 已提交
114
		ln = newTCPMuxGroupListener(group, tmg, tcpMuxLn.Addr())
G
Guy Lewin 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132

		tmg.group = group
		tmg.groupKey = groupKey
		tmg.domain = domain
		tmg.tcpMuxLn = tcpMuxLn
		tmg.lns = append(tmg.lns, ln)
		if tmg.acceptCh == nil {
			tmg.acceptCh = make(chan net.Conn)
		}
		go tmg.worker()
	} else {
		// domain in the same group must be equal
		if tmg.group != group || tmg.domain != domain {
			return nil, ErrGroupParamsInvalid
		}
		if tmg.groupKey != groupKey {
			return nil, ErrGroupAuthFailed
		}
F
fatedier 已提交
133
		ln = newTCPMuxGroupListener(group, tmg, tmg.lns[0].Addr())
G
Guy Lewin 已提交
134 135 136 137 138
		tmg.lns = append(tmg.lns, ln)
	}
	return
}

F
fatedier 已提交
139 140
// worker is called when the real TCP listener has been created
func (tmg *TCPMuxGroup) worker() {
G
Guy Lewin 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154
	for {
		c, err := tmg.tcpMuxLn.Accept()
		if err != nil {
			return
		}
		err = gerr.PanicToError(func() {
			tmg.acceptCh <- c
		})
		if err != nil {
			return
		}
	}
}

F
fatedier 已提交
155
func (tmg *TCPMuxGroup) Accept() <-chan net.Conn {
G
Guy Lewin 已提交
156 157 158
	return tmg.acceptCh
}

F
fatedier 已提交
159 160
// CloseListener remove the TCPMuxGroupListener from the TCPMuxGroup
func (tmg *TCPMuxGroup) CloseListener(ln *TCPMuxGroupListener) {
G
Guy Lewin 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
	tmg.mu.Lock()
	defer tmg.mu.Unlock()
	for i, tmpLn := range tmg.lns {
		if tmpLn == ln {
			tmg.lns = append(tmg.lns[:i], tmg.lns[i+1:]...)
			break
		}
	}
	if len(tmg.lns) == 0 {
		close(tmg.acceptCh)
		tmg.tcpMuxLn.Close()
		tmg.ctl.RemoveGroup(tmg.group)
	}
}

F
fatedier 已提交
176 177
// TCPMuxGroupListener
type TCPMuxGroupListener struct {
G
Guy Lewin 已提交
178
	groupName string
F
fatedier 已提交
179
	group     *TCPMuxGroup
G
Guy Lewin 已提交
180 181 182 183 184

	addr    net.Addr
	closeCh chan struct{}
}

F
fatedier 已提交
185 186
func newTCPMuxGroupListener(name string, group *TCPMuxGroup, addr net.Addr) *TCPMuxGroupListener {
	return &TCPMuxGroupListener{
G
Guy Lewin 已提交
187 188 189 190 191 192 193
		groupName: name,
		group:     group,
		addr:      addr,
		closeCh:   make(chan struct{}),
	}
}

F
fatedier 已提交
194 195
// Accept will accept connections from TCPMuxGroup
func (ln *TCPMuxGroupListener) Accept() (c net.Conn, err error) {
G
Guy Lewin 已提交
196 197 198 199 200 201 202 203 204 205 206 207
	var ok bool
	select {
	case <-ln.closeCh:
		return nil, ErrListenerClosed
	case c, ok = <-ln.group.Accept():
		if !ok {
			return nil, ErrListenerClosed
		}
		return c, nil
	}
}

F
fatedier 已提交
208
func (ln *TCPMuxGroupListener) Addr() net.Addr {
G
Guy Lewin 已提交
209 210 211 212
	return ln.addr
}

// Close close the listener
F
fatedier 已提交
213
func (ln *TCPMuxGroupListener) Close() (err error) {
G
Guy Lewin 已提交
214 215 216 217 218 219
	close(ln.closeCh)

	// remove self from TcpMuxGroup
	ln.group.CloseListener(ln)
	return
}