agent.go 3.2 KB
Newer Older
H
heyanlong 已提交
1 2 3
package service

import (
H
heyanlong 已提交
4
	"agent/agent/logger"
H
heyanlong 已提交
5 6
	"agent/agent/pb/agent"
	"agent/agent/pb/agent2"
H
heyanlong 已提交
7
	"agent/agent/pb/register2"
H
heyanlong 已提交
8 9
	"container/list"
	"fmt"
H
heyanlong 已提交
10
	"github.com/urfave/cli"
H
heyanlong 已提交
11 12 13
	"google.golang.org/grpc"
	"net"
	"os"
H
heyanlong 已提交
14
	"sync"
H
heyanlong 已提交
15 16 17
	"time"
)

H
heyanlong 已提交
18
var log = logger.Log
H
heyanlong 已提交
19

H
heyanlong 已提交
20 21 22 23 24
type register struct {
	c    net.Conn
	body string
}

H
heyanlong 已提交
25
type grpcClient struct {
H
heyanlong 已提交
26 27
	segmentClientV5 agent.TraceSegmentServiceClient
	segmentClientV6 agent2.TraceSegmentReportServiceClient
H
heyanlong 已提交
28 29
	pingClient5     agent.InstanceDiscoveryServiceClient
	pintClient6     register2.ServiceInstancePingClient
H
heyanlong 已提交
30 31 32
}

type Agent struct {
H
heyanlong 已提交
33 34 35 36 37 38 39 40 41 42
	flag              *cli.Context
	grpcConn          *grpc.ClientConn
	grpcClient        grpcClient
	socket            string
	socketListener    net.Listener
	register          chan *register
	registerCache     sync.Map
	registerCacheLock sync.Mutex
	trace             chan string
	queue             *list.List
H
heyanlong 已提交
43 44
}

H
heyanlong 已提交
45
func NewAgent(cli *cli.Context) *Agent {
H
heyanlong 已提交
46
	var agent = &Agent{
H
heyanlong 已提交
47 48
		flag:     cli,
		socket:   cli.String("socket"),
H
heyanlong 已提交
49 50 51 52 53 54 55 56 57 58 59
		register: make(chan *register),
		trace:    make(chan string),
		queue:    list.New(),
	}

	go agent.sub()

	return agent
}

func (t *Agent) Run() {
H
heyanlong 已提交
60 61
	log.Info("hello skywalking")
	t.connGRPC()
H
heyanlong 已提交
62 63 64
	t.listenSocket()

	defer func() {
H
heyanlong 已提交
65 66 67 68 69 70
		var err error
		err = t.socketListener.Close()
		if err != nil {
			log.Errorln(err)
		}

H
heyanlong 已提交
71
		err = t.grpcConn.Close()
H
heyanlong 已提交
72
		if err != nil {
H
heyanlong 已提交
73
			log.Errorln(err)
H
heyanlong 已提交
74 75 76 77 78 79
		}
	}()
}

func (t *Agent) connGRPC() {
	var err error
H
heyanlong 已提交
80 81
	grpcAdd := t.flag.String("grpc")
	t.grpcConn, err = grpc.Dial(grpcAdd, grpc.WithInsecure())
H
heyanlong 已提交
82
	if err != nil {
H
heyanlong 已提交
83 84
		log.Panic(err)
	}
H
heyanlong 已提交
85 86

	log.Infof("connection %s...", grpcAdd)
H
heyanlong 已提交
87 88 89 90
	t.grpcClient.segmentClientV5 = agent.NewTraceSegmentServiceClient(t.grpcConn)
	t.grpcClient.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.grpcConn)
	t.grpcClient.pingClient5 = agent.NewInstanceDiscoveryServiceClient(t.grpcConn)
	t.grpcClient.pintClient6 = register2.NewServiceInstancePingClient(t.grpcConn)
H
heyanlong 已提交
91 92 93 94 95
}

func (t *Agent) listenSocket() {
	var err error
	if err = os.RemoveAll(t.socket); err != nil {
H
heyanlong 已提交
96
		log.Panic(err)
H
heyanlong 已提交
97 98 99
	}
	t.socketListener, err = net.Listen("unix", t.socket)
	if err != nil {
H
heyanlong 已提交
100
		log.Panic(err)
H
heyanlong 已提交
101 102
	}

H
heyanlong 已提交
103
	err = os.Chmod(t.socket, os.ModeSocket|0777)
H
heyanlong 已提交
104
	if err != nil {
H
heyanlong 已提交
105
		log.Warningln(err)
H
heyanlong 已提交
106 107 108 109 110
	}

	for {
		c, err := t.socketListener.Accept()
		if err != nil {
H
heyanlong 已提交
111
			log.Errorln(err)
H
heyanlong 已提交
112 113 114 115 116 117 118 119 120 121
			break
		}
		// start a new goroutine to handle
		// the new connection.
		conn := NewConn(t, c)
		go conn.Handle()
	}
}

func (t *Agent) sub() {
H
heyanlong 已提交
122
	heartbeatTicker := time.NewTicker(time.Second * 40)
H
heyanlong 已提交
123
	defer heartbeatTicker.Stop()
H
heyanlong 已提交
124 125
	traceSendTicker := time.NewTicker(time.Second * time.Duration(t.flag.Int("send-rate")))
	defer traceSendTicker.Stop()
H
heyanlong 已提交
126

H
heyanlong 已提交
127 128
	for {
		select {
H
heyanlong 已提交
129 130 131
		case <-traceSendTicker.C:
			len := t.queue.Len()
			if len > 0 {
H
heyanlong 已提交
132
				var segments []*upstreamSegment
H
heyanlong 已提交
133
				for i := 0; i < len; i++ {
H
heyanlong 已提交
134
					// front top 100
H
heyanlong 已提交
135 136
					e := t.queue.Front()
					st := format(fmt.Sprintf("%v", e.Value))
H
heyanlong 已提交
137 138 139
					if st != nil {
						segments = append(segments, st)
					}
H
heyanlong 已提交
140
					t.queue.Remove(e)
H
heyanlong 已提交
141 142 143
				}
				go t.send(segments)
			}
H
heyanlong 已提交
144 145 146 147 148 149 150
		case <-heartbeatTicker.C:
			go t.heartbeat()
		case register := <-t.register:
			go t.doRegister(register)
		case trace := <-t.trace:
			t.queue.PushBack(trace)
			go t.recoverRegister(trace)
H
heyanlong 已提交
151 152 153
		}
	}
}