agent.go 4.6 KB
Newer Older
H
heyanlong 已提交
1 2 3
package service

import (
4 5 6 7 8 9
	"github.com/SkyAPM/SkyAPM-php-sdk/reporter/logger"
	nla1 "github.com/SkyAPM/SkyAPM-php-sdk/reporter/network/language/agent/v1"
	nla2 "github.com/SkyAPM/SkyAPM-php-sdk/reporter/network/language/agent/v2"
	nla3 "github.com/SkyAPM/SkyAPM-php-sdk/reporter/network/language/agent/v3"
	nm3 "github.com/SkyAPM/SkyAPM-php-sdk/reporter/network/management/v3"
	nr2 "github.com/SkyAPM/SkyAPM-php-sdk/reporter/network/register/v2"
H
heyanlong 已提交
10
	cli "github.com/urfave/cli/v2"
H
heyanlong 已提交
11
	"google.golang.org/grpc"
H
heyanlong 已提交
12
	"math/rand"
H
heyanlong 已提交
13 14
	"net"
	"os"
H
heyanlong 已提交
15
	"strings"
H
heyanlong 已提交
16
	"sync"
H
heyanlong 已提交
17 18 19
	"time"
)

H
heyanlong 已提交
20
var log = logger.Log
H
heyanlong 已提交
21

H
heyanlong 已提交
22 23 24 25 26
type register struct {
	c    net.Conn
	body string
}

H
heyanlong 已提交
27
type grpcClient struct {
28 29 30 31 32 33
	tssc1  nla1.TraceSegmentServiceClient
	idsc1  nla1.InstanceDiscoveryServiceClient
	tsrsc2 nla2.TraceSegmentReportServiceClient
	tsrsc3 nla3.TraceSegmentReportServiceClient
	sipc2  nr2.ServiceInstancePingClient
	msc3   nm3.ManagementServiceClient
H
heyanlong 已提交
34 35 36
}

type Agent struct {
H
heyanlong 已提交
37
	flag              *cli.Context
38
	version           int
H
heyanlong 已提交
39 40 41 42 43
	grpcConn          *grpc.ClientConn
	grpcClient        grpcClient
	socket            string
	socketListener    net.Listener
	register          chan *register
H
heyanlong 已提交
44 45
	registerCache     map[int]registerCache
	registerCacheLock sync.RWMutex
H
heyanlong 已提交
46
	trace             chan string
H
heyanlong 已提交
47 48
	queue             []string
	queueLock         sync.Mutex
H
heyanlong 已提交
49 50
}

H
heyanlong 已提交
51
func NewAgent(cli *cli.Context) *Agent {
H
heyanlong 已提交
52
	var agent = &Agent{
H
heyanlong 已提交
53
		flag:          cli,
54
		version:       cli.Int("sky-version"),
H
heyanlong 已提交
55 56 57 58
		socket:        cli.String("socket"),
		register:      make(chan *register),
		trace:         make(chan string),
		registerCache: make(map[int]registerCache),
H
heyanlong 已提交
59 60 61 62 63 64 65 66
	}

	go agent.sub()

	return agent
}

func (t *Agent) Run() {
H
heyanlong 已提交
67
	log.Info("hello skywalking")
H
heyanlong 已提交
68 69

	defer func() {
H
heyanlong 已提交
70
		var err error
H
heyanlong 已提交
71 72 73 74 75
		if t.socketListener != nil {
			err = t.socketListener.Close()
			if err != nil {
				log.Errorln(err)
			}
H
heyanlong 已提交
76 77
		}

H
heyanlong 已提交
78 79 80 81 82
		if t.grpcConn != nil {
			err = t.grpcConn.Close()
			if err != nil {
				log.Errorln(err)
			}
H
heyanlong 已提交
83 84
		}
	}()
H
heyanlong 已提交
85 86
	t.connGRPC()
	t.listenSocket()
H
heyanlong 已提交
87 88 89 90
}

func (t *Agent) connGRPC() {
	var err error
H
heyanlong 已提交
91 92 93 94 95 96 97
	grpcAdd := t.flag.StringSlice("grpc")
	var services []string
	for _, item := range grpcAdd {
		services = append(services, strings.Split(item, ",")...)
	}

	r := rand.New(rand.NewSource(time.Now().UnixNano()))
98 99 100 101 102 103
	var grpcAddress string
	if len(services) > 0 {
		grpcAddress = services[r.Intn(len(services))]
	} else {
		log.Panic("oap server not found")
	}
H
heyanlong 已提交
104 105

	t.grpcConn, err = grpc.Dial(grpcAddress, grpc.WithInsecure())
H
heyanlong 已提交
106
	if err != nil {
H
heyanlong 已提交
107 108
		log.Panic(err)
	}
H
heyanlong 已提交
109

H
heyanlong 已提交
110
	log.Infof("connection %s...", grpcAddress)
111 112 113 114 115 116 117 118 119 120 121

	if t.version == 5 {
		t.grpcClient.tssc1 = nla1.NewTraceSegmentServiceClient(t.grpcConn)
		t.grpcClient.idsc1 = nla1.NewInstanceDiscoveryServiceClient(t.grpcConn)
	} else if t.version == 6 || t.version == 7 {
		t.grpcClient.tsrsc2 = nla2.NewTraceSegmentReportServiceClient(t.grpcConn)
		t.grpcClient.sipc2 = nr2.NewServiceInstancePingClient(t.grpcConn)
	} else if t.version == 8 {
		t.grpcClient.tsrsc3 = nla3.NewTraceSegmentReportServiceClient(t.grpcConn)
		t.grpcClient.msc3 = nm3.NewManagementServiceClient(t.grpcConn)
	}
H
heyanlong 已提交
122
	log.Info("🍺 skywalking php agent started successfully, enjoy yourself")
H
heyanlong 已提交
123 124 125 126
}

func (t *Agent) listenSocket() {
	var err error
H
heyanlong 已提交
127 128 129 130 131 132 133

	fi, _ := os.Stat(t.socket)

	if fi != nil && !fi.Mode().IsDir() {
		if err = os.RemoveAll(t.socket); err != nil {
			log.Panic(err)
		}
H
heyanlong 已提交
134
	}
H
heyanlong 已提交
135

H
heyanlong 已提交
136 137
	t.socketListener, err = net.Listen("unix", t.socket)
	if err != nil {
H
heyanlong 已提交
138
		log.Panic(err)
H
heyanlong 已提交
139 140
	}

H
heyanlong 已提交
141
	err = os.Chmod(t.socket, os.ModeSocket|0777)
H
heyanlong 已提交
142
	if err != nil {
H
heyanlong 已提交
143
		log.Warningln(err)
H
heyanlong 已提交
144 145 146 147 148
	}

	for {
		c, err := t.socketListener.Accept()
		if err != nil {
H
heyanlong 已提交
149
			log.Errorln(err)
H
heyanlong 已提交
150 151 152 153 154 155 156 157 158 159
			break
		}
		// start a new goroutine to handle
		// the new connection.
		conn := NewConn(t, c)
		go conn.Handle()
	}
}

func (t *Agent) sub() {
H
heyanlong 已提交
160
	heartbeatTicker := time.NewTicker(time.Second * 40)
H
heyanlong 已提交
161
	defer heartbeatTicker.Stop()
H
heyanlong 已提交
162 163
	traceSendTicker := time.NewTicker(time.Second * time.Duration(t.flag.Int("send-rate")))
	defer traceSendTicker.Stop()
H
heyanlong 已提交
164

H
heyanlong 已提交
165 166
	for {
		select {
H
heyanlong 已提交
167
		case <-traceSendTicker.C:
H
heyanlong 已提交
168
			len := len(t.queue)
H
heyanlong 已提交
169
			if len > 0 {
H
heyanlong 已提交
170
				var segments []*upstreamSegment
H
heyanlong 已提交
171 172 173 174 175 176 177

				t.queueLock.Lock()
				list := t.queue[:]
				t.queue = []string{}
				t.queueLock.Unlock()

				for _, trace := range list {
178
					info, st := format(t.version, trace)
H
heyanlong 已提交
179
					if st != nil {
H
heyanlong 已提交
180
						t.recoverRegister(info)
H
heyanlong 已提交
181 182 183 184 185
						segments = append(segments, st)
					}
				}
				go t.send(segments)
			}
H
heyanlong 已提交
186 187 188 189 190
		case <-heartbeatTicker.C:
			go t.heartbeat()
		case register := <-t.register:
			go t.doRegister(register)
		case trace := <-t.trace:
H
heyanlong 已提交
191
			t.queueLock.Lock()
192 193 194 195 196
			if len(t.queue) < 65535*2 {
				t.queue = append(t.queue, trace)
			} else {
				log.Warnf("trace queue is fill.")
			}
H
heyanlong 已提交
197
			t.queueLock.Unlock()
H
heyanlong 已提交
198 199 200
		}
	}
}