From 2ebeafc87432cc8c41e15e64c0c1a537618558cc Mon Sep 17 00:00:00 2001 From: heyanlong Date: Tue, 17 Sep 2019 19:23:11 +0800 Subject: [PATCH] New agent --- src/agent/service/agent.go | 118 ++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 60 deletions(-) diff --git a/src/agent/service/agent.go b/src/agent/service/agent.go index 7496b9c..4145d71 100644 --- a/src/agent/service/agent.go +++ b/src/agent/service/agent.go @@ -118,6 +118,19 @@ func (t *Agent) sub() { go t.reg(register) case trace := <-t.trace: t.queue.PushBack(trace) + + if t.queue.Len() > 100 { + var segments []*upstreamSegment + for i := 0; i < 100; i++ { + // front top 100 + first := t.queue.Front().Value + st := format(fmt.Sprintf("%v", first)) + if st != nil { + segments = append(segments, st) + } + } + go t.send(segments) + } } } } @@ -127,78 +140,63 @@ func (t *Agent) reg(r *register) { fmt.Println(t.segmentClientV5) } -func (t *Agent) send() { +func (t *Agent) send(segments []*upstreamSegment) { var err error - for { - if t.queue.Len() >= 100 { - var segments []*upstreamSegment - for i := 0; i < 100; i++ { - // front top 100 - first := t.queue.Front().Value - st := format(fmt.Sprintf("%v", first)) - if st != nil { - segments = append(segments, st) - } + // process + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3) + defer cancel6() + + var stream5 agent.TraceSegmentService_CollectClient + var stream6 agent2.TraceSegmentReportService_CollectClient + + for _, segment := range segments { + log.Println("trace => Start trace...") + if segment.Version == 5 { + if stream5 == nil { + stream5, err = t.segmentClientV5.Collect(ctx) } - // process - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3) - - var stream5 agent.TraceSegmentService_CollectClient - var stream6 agent2.TraceSegmentReportService_CollectClient - - for _, segment := range segments { - log.Println("trace => Start trace...") - if segment.Version == 5 { - if stream5 == nil { - stream5, err = t.segmentClientV5.Collect(ctx) - } - - if stream5 != nil { - if err = stream5.Send(segment.segment); err != nil { - if err == io.EOF { - break - } - fmt.Println(err) - } - } else { - fmt.Println("stream not open") - } - - } else if segment.Version == 6 { - if stream6 == nil { - stream6, err = t.segmentClientV6.Collect(ctx6) - } - if stream6 != nil { - if err = stream6.Send(segment.segment); err != nil { - if err == io.EOF { - break - } - fmt.Println(err) - } - } else { - fmt.Println("stream not open") + if stream5 != nil { + if err = stream5.Send(segment.segment); err != nil { + if err == io.EOF { + break } + fmt.Println(err) } + } else { + fmt.Println("stream not open") } - if stream5 != nil { - _, err = stream5.CloseAndRecv() - if err != nil { - log.Println(err) - } + } else if segment.Version == 6 { + if stream6 == nil { + stream6, err = t.segmentClientV6.Collect(ctx6) } - if stream6 != nil { - _, err = stream6.CloseAndRecv() - if err != nil { - log.Println(err) + if err = stream6.Send(segment.segment); err != nil { + if err == io.EOF { + break + } + fmt.Println(err) } + } else { + fmt.Println("stream not open") } + } + } - cancel() - cancel6() + if stream5 != nil { + _, err = stream5.CloseAndRecv() + if err != nil { + fmt.Println(err) + } + } + + if stream6 != nil { + _, err = stream6.CloseAndRecv() + if err != nil { + fmt.Println(err) } } } -- GitLab