提交 927135da 编写于 作者: H heyanlong

New agent

上级 9120c74a
......@@ -320,7 +320,6 @@ func heartbeat() {
}
func main() {
fmt.Println("hello skywalking")
a := service.NewAgent()
a.Run()
}
......@@ -6,31 +6,47 @@ import (
"container/list"
"context"
"fmt"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"io"
"log"
"net"
"os"
"sync"
"time"
)
var log = logrus.New()
func init() {
log.SetOutput(os.Stdout)
log.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05",
})
}
type register struct {
c net.Conn
body string
}
type Agent struct {
grpc string
conn *grpc.ClientConn
type grpcClient struct {
segmentClientV5 agent.TraceSegmentServiceClient
segmentClientV6 agent2.TraceSegmentReportServiceClient
socket string
socketListener net.Listener
register chan *register
registerCache sync.Map
trace chan string
queue *list.List
streamV5 agent.TraceSegmentService_CollectClient
streamV6 agent2.TraceSegmentReportService_CollectClient
}
type Agent struct {
grpc string
conn *grpc.ClientConn
grpcClient grpcClient
socket string
socketListener net.Listener
register chan *register
registerCache sync.Map
trace chan string
queue *list.List
}
func NewAgent() *Agent {
......@@ -59,17 +75,33 @@ func NewAgent() *Agent {
}
func (t *Agent) Run() {
//t.connGRPC()
log.Info("hello skywalking")
t.connGRPC()
t.listenSocket()
defer func() {
//err := t.conn.Close()
//if err != nil {
// fmt.Println(err)
//}
err := t.socketListener.Close()
var err error
err = t.socketListener.Close()
if err != nil {
log.Errorln(err)
}
if t.grpcClient.streamV5 != nil {
_, err = t.grpcClient.streamV5.CloseAndRecv()
if err != nil {
log.Errorln(err)
}
}
if t.grpcClient.streamV6 != nil {
_, err = t.grpcClient.streamV6.CloseAndRecv()
if err != nil {
log.Errorln(err)
}
}
err = t.conn.Close()
if err != nil {
fmt.Println(err)
log.Errorln(err)
}
}()
}
......@@ -78,31 +110,50 @@ func (t *Agent) connGRPC() {
var err error
t.conn, err = grpc.Dial(t.grpc, grpc.WithInsecure())
if err != nil {
panic(err)
log.Panic(err)
}
t.grpcClient.segmentClientV5 = agent.NewTraceSegmentServiceClient(t.conn)
t.grpcClient.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3)
defer cancel6()
t.grpcClient.streamV5, err = t.grpcClient.segmentClientV5.Collect(ctx)
if err != nil {
log.Warningln(err)
}
t.grpcClient.streamV6, err = t.grpcClient.segmentClientV6.Collect(ctx6)
if err != nil {
log.Warningln(err)
}
if t.grpcClient.streamV5 == nil && t.grpcClient.streamV6 == nil {
log.Panic("No stream available")
}
t.segmentClientV5 = agent.NewTraceSegmentServiceClient(t.conn)
t.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.conn)
}
func (t *Agent) listenSocket() {
var err error
if err = os.RemoveAll(t.socket); err != nil {
panic(err)
log.Panic(err)
}
t.socketListener, err = net.Listen("unix", t.socket)
if err != nil {
panic(err)
log.Panic(err)
}
err = os.Chmod(t.socket, os.ModeSocket|0666)
if err != nil {
fmt.Println(err)
log.Warningln(err)
}
for {
c, err := t.socketListener.Accept()
if err != nil {
fmt.Println(err)
log.Errorln(err)
break
}
// start a new goroutine to handle
......@@ -121,7 +172,7 @@ func (t *Agent) sub() {
case <-heartbeatTicker.C:
go t.heartbeat()
case register := <-t.register:
go t.reg(register)
go t.doRegister(register)
case trace := <-t.trace:
t.queue.PushBack(trace)
......@@ -141,31 +192,19 @@ func (t *Agent) sub() {
}
}
func (t *Agent) reg(r *register) {
func (t *Agent) doRegister(r *register) {
fmt.Println(r)
fmt.Println(t.segmentClientV5)
fmt.Println(t.grpcClient.segmentClientV5)
}
func (t *Agent) send(segments []*upstreamSegment) {
var err error
// process
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3)
defer cancel6()
var stream5 agent.TraceSegmentService_CollectClient
var stream6 agent2.TraceSegmentReportService_CollectClient
for _, segment := range segments {
log.Println("trace => Start trace...")
if segment.Version == 5 {
if stream5 == nil {
stream5, err = t.segmentClientV5.Collect(ctx)
}
if stream5 != nil {
if err = stream5.Send(segment.segment); err != nil {
if t.grpcClient.streamV5 != nil {
if err = t.grpcClient.streamV5.Send(segment.segment); err != nil {
if err == io.EOF {
break
}
......@@ -176,11 +215,8 @@ func (t *Agent) send(segments []*upstreamSegment) {
}
} else if segment.Version == 6 {
if stream6 == nil {
stream6, err = t.segmentClientV6.Collect(ctx6)
}
if stream6 != nil {
if err = stream6.Send(segment.segment); err != nil {
if t.grpcClient.streamV6 != nil {
if err = t.grpcClient.streamV6.Send(segment.segment); err != nil {
if err == io.EOF {
break
}
......@@ -191,18 +227,4 @@ func (t *Agent) send(segments []*upstreamSegment) {
}
}
}
if stream5 != nil {
_, err = stream5.CloseAndRecv()
if err != nil {
fmt.Println(err)
}
}
if stream6 != nil {
_, err = stream6.CloseAndRecv()
if err != nil {
fmt.Println(err)
}
}
}
......@@ -6,7 +6,6 @@ import (
"agent/agent/pb/common"
"encoding/json"
"github.com/golang/protobuf/proto"
"log"
"strconv"
"strings"
)
......
......@@ -3,5 +3,6 @@ module agent
require (
github.com/golang/protobuf v1.3.1
github.com/google/uuid v1.1.1
github.com/sirupsen/logrus v1.4.2
google.golang.org/grpc v1.21.1
)
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
......@@ -10,6 +11,12 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
......@@ -18,6 +25,8 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册