agent.go 4.0 KB
Newer Older
H
heyanlong 已提交
1 2 3
package service

import (
H
heyanlong 已提交
4
	"agent/agent/logger"
H
heyanlong 已提交
5 6
	"agent/agent/pb/agent"
	"agent/agent/pb/agent2"
H
heyanlong 已提交
7
	"agent/agent/pb/register2"
H
heyanlong 已提交
8 9 10
	"container/list"
	"context"
	"fmt"
H
heyanlong 已提交
11
	"github.com/urfave/cli"
H
heyanlong 已提交
12 13 14
	"google.golang.org/grpc"
	"net"
	"os"
H
heyanlong 已提交
15
	"sync"
H
heyanlong 已提交
16 17 18
	"time"
)

H
heyanlong 已提交
19
var log = logger.Log
H
heyanlong 已提交
20

H
heyanlong 已提交
21 22 23 24 25
type register struct {
	c    net.Conn
	body string
}

H
heyanlong 已提交
26
type grpcClient struct {
H
heyanlong 已提交
27 28
	segmentClientV5 agent.TraceSegmentServiceClient
	segmentClientV6 agent2.TraceSegmentReportServiceClient
H
heyanlong 已提交
29 30
	pingClient5     agent.InstanceDiscoveryServiceClient
	pintClient6     register2.ServiceInstancePingClient
H
heyanlong 已提交
31 32 33 34 35
	streamV5        agent.TraceSegmentService_CollectClient
	streamV6        agent2.TraceSegmentReportService_CollectClient
}

type Agent struct {
H
heyanlong 已提交
36 37 38 39 40 41 42 43 44 45
	flag              *cli.Context
	grpcConn          *grpc.ClientConn
	grpcClient        grpcClient
	socket            string
	socketListener    net.Listener
	register          chan *register
	registerCache     sync.Map
	registerCacheLock sync.Mutex
	trace             chan string
	queue             *list.List
H
heyanlong 已提交
46 47
}

H
heyanlong 已提交
48
func NewAgent(cli *cli.Context) *Agent {
H
heyanlong 已提交
49
	var agent = &Agent{
H
heyanlong 已提交
50 51
		flag:     cli,
		socket:   cli.String("socket"),
H
heyanlong 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
		register: make(chan *register),
		trace:    make(chan string),
		queue:    list.New(),
	}

	go func() {
		// test
		time.Sleep(time.Second * 3)
		conn, _ := net.Dial("unix", agent.socket)
		for {
			conn.Write([]byte("0{1212}\n"))
			time.Sleep(time.Second)
			fmt.Println("11")
		}

	}()

	go agent.sub()

	return agent
}

func (t *Agent) Run() {
H
heyanlong 已提交
75 76
	log.Info("hello skywalking")
	t.connGRPC()
H
heyanlong 已提交
77 78 79
	t.listenSocket()

	defer func() {
H
heyanlong 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
		var err error
		err = t.socketListener.Close()
		if err != nil {
			log.Errorln(err)
		}

		if t.grpcClient.streamV5 != nil {
			_, err = t.grpcClient.streamV5.CloseAndRecv()
			if err != nil {
				log.Errorln(err)
			}
		}

		if t.grpcClient.streamV6 != nil {
			_, err = t.grpcClient.streamV6.CloseAndRecv()
			if err != nil {
				log.Errorln(err)
			}
		}
H
heyanlong 已提交
99
		err = t.grpcConn.Close()
H
heyanlong 已提交
100
		if err != nil {
H
heyanlong 已提交
101
			log.Errorln(err)
H
heyanlong 已提交
102 103 104 105 106 107
		}
	}()
}

func (t *Agent) connGRPC() {
	var err error
H
heyanlong 已提交
108
	t.grpcConn, err = grpc.Dial(t.flag.String("grpc"), grpc.WithInsecure())
H
heyanlong 已提交
109
	if err != nil {
H
heyanlong 已提交
110 111
		log.Panic(err)
	}
H
heyanlong 已提交
112 113 114 115
	t.grpcClient.segmentClientV5 = agent.NewTraceSegmentServiceClient(t.grpcConn)
	t.grpcClient.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.grpcConn)
	t.grpcClient.pingClient5 = agent.NewInstanceDiscoveryServiceClient(t.grpcConn)
	t.grpcClient.pintClient6 = register2.NewServiceInstancePingClient(t.grpcConn)
H
heyanlong 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()
	ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel6()

	t.grpcClient.streamV5, err = t.grpcClient.segmentClientV5.Collect(ctx)
	if err != nil {
		log.Warningln(err)
	}

	t.grpcClient.streamV6, err = t.grpcClient.segmentClientV6.Collect(ctx6)
	if err != nil {
		log.Warningln(err)
	}

	if t.grpcClient.streamV5 == nil && t.grpcClient.streamV6 == nil {
		log.Panic("No stream available")
H
heyanlong 已提交
134 135 136 137 138 139
	}
}

func (t *Agent) listenSocket() {
	var err error
	if err = os.RemoveAll(t.socket); err != nil {
H
heyanlong 已提交
140
		log.Panic(err)
H
heyanlong 已提交
141 142 143
	}
	t.socketListener, err = net.Listen("unix", t.socket)
	if err != nil {
H
heyanlong 已提交
144
		log.Panic(err)
H
heyanlong 已提交
145 146 147 148
	}

	err = os.Chmod(t.socket, os.ModeSocket|0666)
	if err != nil {
H
heyanlong 已提交
149
		log.Warningln(err)
H
heyanlong 已提交
150 151 152 153 154
	}

	for {
		c, err := t.socketListener.Accept()
		if err != nil {
H
heyanlong 已提交
155
			log.Errorln(err)
H
heyanlong 已提交
156 157 158 159 160 161 162 163 164 165
			break
		}
		// start a new goroutine to handle
		// the new connection.
		conn := NewConn(t, c)
		go conn.Handle()
	}
}

func (t *Agent) sub() {
H
heyanlong 已提交
166 167 168
	heartbeatTicker := time.NewTicker(time.Duration(time.Second * 40))
	defer heartbeatTicker.Stop()

H
heyanlong 已提交
169 170
	for {
		select {
H
heyanlong 已提交
171 172
		case <-heartbeatTicker.C:
			go t.heartbeat()
H
heyanlong 已提交
173
		case register := <-t.register:
H
heyanlong 已提交
174
			go t.doRegister(register)
H
heyanlong 已提交
175 176
		case trace := <-t.trace:
			t.queue.PushBack(trace)
H
heyanlong 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189

			if t.queue.Len() > 100 {
				var segments []*upstreamSegment
				for i := 0; i < 100; i++ {
					// front top 100
					first := t.queue.Front().Value
					st := format(fmt.Sprintf("%v", first))
					if st != nil {
						segments = append(segments, st)
					}
				}
				go t.send(segments)
			}
H
heyanlong 已提交
190 191 192
		}
	}
}