diff --git a/src/agent/main.go b/src/agent/main.go index 47764ab3d4d701a9c49641e7a493a644e5fdcfea..97ea336d03025fc14e036026587b30e1da98de2c 100644 --- a/src/agent/main.go +++ b/src/agent/main.go @@ -320,7 +320,6 @@ func heartbeat() { } func main() { - fmt.Println("hello skywalking") a := service.NewAgent() a.Run() } diff --git a/src/agent/service/agent.go b/src/agent/service/agent.go index a5b13ac442ed30e10c094ff3643e119f4232345d..b2bb09c1f56244f5551ae42a8731446302ec473e 100644 --- a/src/agent/service/agent.go +++ b/src/agent/service/agent.go @@ -6,31 +6,47 @@ import ( "container/list" "context" "fmt" + "github.com/sirupsen/logrus" "google.golang.org/grpc" "io" - "log" "net" "os" "sync" "time" ) +var log = logrus.New() + +func init() { + log.SetOutput(os.Stdout) + log.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + TimestampFormat: "2006-01-02 15:04:05", + }) +} + type register struct { c net.Conn body string } -type Agent struct { - grpc string - conn *grpc.ClientConn +type grpcClient struct { segmentClientV5 agent.TraceSegmentServiceClient segmentClientV6 agent2.TraceSegmentReportServiceClient - socket string - socketListener net.Listener - register chan *register - registerCache sync.Map - trace chan string - queue *list.List + streamV5 agent.TraceSegmentService_CollectClient + streamV6 agent2.TraceSegmentReportService_CollectClient +} + +type Agent struct { + grpc string + conn *grpc.ClientConn + grpcClient grpcClient + socket string + socketListener net.Listener + register chan *register + registerCache sync.Map + trace chan string + queue *list.List } func NewAgent() *Agent { @@ -59,17 +75,33 @@ func NewAgent() *Agent { } func (t *Agent) Run() { - //t.connGRPC() + log.Info("hello skywalking") + t.connGRPC() t.listenSocket() defer func() { - //err := t.conn.Close() - //if err != nil { - // fmt.Println(err) - //} - err := t.socketListener.Close() + var err error + err = t.socketListener.Close() + if err != nil { + log.Errorln(err) + } + + if t.grpcClient.streamV5 != nil { + _, err = t.grpcClient.streamV5.CloseAndRecv() + if err != nil { + log.Errorln(err) + } + } + + if t.grpcClient.streamV6 != nil { + _, err = t.grpcClient.streamV6.CloseAndRecv() + if err != nil { + log.Errorln(err) + } + } + err = t.conn.Close() if err != nil { - fmt.Println(err) + log.Errorln(err) } }() } @@ -78,31 +110,50 @@ func (t *Agent) connGRPC() { var err error t.conn, err = grpc.Dial(t.grpc, grpc.WithInsecure()) if err != nil { - panic(err) + log.Panic(err) + } + t.grpcClient.segmentClientV5 = agent.NewTraceSegmentServiceClient(t.conn) + t.grpcClient.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.conn) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3) + defer cancel6() + + t.grpcClient.streamV5, err = t.grpcClient.segmentClientV5.Collect(ctx) + if err != nil { + log.Warningln(err) + } + + t.grpcClient.streamV6, err = t.grpcClient.segmentClientV6.Collect(ctx6) + if err != nil { + log.Warningln(err) + } + + if t.grpcClient.streamV5 == nil && t.grpcClient.streamV6 == nil { + log.Panic("No stream available") } - t.segmentClientV5 = agent.NewTraceSegmentServiceClient(t.conn) - t.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.conn) } func (t *Agent) listenSocket() { var err error if err = os.RemoveAll(t.socket); err != nil { - panic(err) + log.Panic(err) } t.socketListener, err = net.Listen("unix", t.socket) if err != nil { - panic(err) + log.Panic(err) } err = os.Chmod(t.socket, os.ModeSocket|0666) if err != nil { - fmt.Println(err) + log.Warningln(err) } for { c, err := t.socketListener.Accept() if err != nil { - fmt.Println(err) + log.Errorln(err) break } // start a new goroutine to handle @@ -121,7 +172,7 @@ func (t *Agent) sub() { case <-heartbeatTicker.C: go t.heartbeat() case register := <-t.register: - go t.reg(register) + go t.doRegister(register) case trace := <-t.trace: t.queue.PushBack(trace) @@ -141,31 +192,19 @@ func (t *Agent) sub() { } } -func (t *Agent) reg(r *register) { +func (t *Agent) doRegister(r *register) { fmt.Println(r) - fmt.Println(t.segmentClientV5) + fmt.Println(t.grpcClient.segmentClientV5) } func (t *Agent) send(segments []*upstreamSegment) { var err error - // process - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3) - defer cancel6() - - var stream5 agent.TraceSegmentService_CollectClient - var stream6 agent2.TraceSegmentReportService_CollectClient for _, segment := range segments { log.Println("trace => Start trace...") if segment.Version == 5 { - if stream5 == nil { - stream5, err = t.segmentClientV5.Collect(ctx) - } - - if stream5 != nil { - if err = stream5.Send(segment.segment); err != nil { + if t.grpcClient.streamV5 != nil { + if err = t.grpcClient.streamV5.Send(segment.segment); err != nil { if err == io.EOF { break } @@ -176,11 +215,8 @@ func (t *Agent) send(segments []*upstreamSegment) { } } else if segment.Version == 6 { - if stream6 == nil { - stream6, err = t.segmentClientV6.Collect(ctx6) - } - if stream6 != nil { - if err = stream6.Send(segment.segment); err != nil { + if t.grpcClient.streamV6 != nil { + if err = t.grpcClient.streamV6.Send(segment.segment); err != nil { if err == io.EOF { break } @@ -191,18 +227,4 @@ func (t *Agent) send(segments []*upstreamSegment) { } } } - - if stream5 != nil { - _, err = stream5.CloseAndRecv() - if err != nil { - fmt.Println(err) - } - } - - if stream6 != nil { - _, err = stream6.CloseAndRecv() - if err != nil { - fmt.Println(err) - } - } } diff --git a/src/agent/service/trace.go b/src/agent/service/trace.go index a36a27e1aea48c04e497a108e621088a44f60afc..3ff4501d4839e01760f6389af2c683025e6b5d01 100644 --- a/src/agent/service/trace.go +++ b/src/agent/service/trace.go @@ -6,7 +6,6 @@ import ( "agent/agent/pb/common" "encoding/json" "github.com/golang/protobuf/proto" - "log" "strconv" "strings" ) diff --git a/src/go.mod b/src/go.mod index 33ffbcd07cd758727dc9a29e52e9f6cc9770fee7..1ffd18757271b49d809bb41587cc30c4bd4993df 100644 --- a/src/go.mod +++ b/src/go.mod @@ -3,5 +3,6 @@ module agent require ( github.com/golang/protobuf v1.3.1 github.com/google/uuid v1.1.1 + github.com/sirupsen/logrus v1.4.2 google.golang.org/grpc v1.21.1 ) diff --git a/src/go.sum b/src/go.sum index a2acb1e9ba2ee7a0725690f7812c03533579e4c2..54ac950ab204df6ce8544027d8797efb2b127886 100644 --- a/src/go.sum +++ b/src/go.sum @@ -1,6 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= @@ -10,6 +11,12 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= @@ -18,6 +25,8 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=