提交 2ebeafc8 编写于 作者: H heyanlong

New agent

上级 5d13982c
...@@ -118,6 +118,19 @@ func (t *Agent) sub() { ...@@ -118,6 +118,19 @@ func (t *Agent) sub() {
go t.reg(register) go t.reg(register)
case trace := <-t.trace: case trace := <-t.trace:
t.queue.PushBack(trace) t.queue.PushBack(trace)
if t.queue.Len() > 100 {
var segments []*upstreamSegment
for i := 0; i < 100; i++ {
// front top 100
first := t.queue.Front().Value
st := format(fmt.Sprintf("%v", first))
if st != nil {
segments = append(segments, st)
}
}
go t.send(segments)
}
} }
} }
} }
...@@ -127,78 +140,63 @@ func (t *Agent) reg(r *register) { ...@@ -127,78 +140,63 @@ func (t *Agent) reg(r *register) {
fmt.Println(t.segmentClientV5) fmt.Println(t.segmentClientV5)
} }
func (t *Agent) send() { func (t *Agent) send(segments []*upstreamSegment) {
var err error var err error
for { // process
if t.queue.Len() >= 100 { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
var segments []*upstreamSegment defer cancel()
for i := 0; i < 100; i++ { ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3)
// front top 100 defer cancel6()
first := t.queue.Front().Value
st := format(fmt.Sprintf("%v", first)) var stream5 agent.TraceSegmentService_CollectClient
if st != nil { var stream6 agent2.TraceSegmentReportService_CollectClient
segments = append(segments, st)
} for _, segment := range segments {
log.Println("trace => Start trace...")
if segment.Version == 5 {
if stream5 == nil {
stream5, err = t.segmentClientV5.Collect(ctx)
} }
// process if stream5 != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) if err = stream5.Send(segment.segment); err != nil {
ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3) if err == io.EOF {
break
var stream5 agent.TraceSegmentService_CollectClient
var stream6 agent2.TraceSegmentReportService_CollectClient
for _, segment := range segments {
log.Println("trace => Start trace...")
if segment.Version == 5 {
if stream5 == nil {
stream5, err = t.segmentClientV5.Collect(ctx)
}
if stream5 != nil {
if err = stream5.Send(segment.segment); err != nil {
if err == io.EOF {
break
}
fmt.Println(err)
}
} else {
fmt.Println("stream not open")
}
} else if segment.Version == 6 {
if stream6 == nil {
stream6, err = t.segmentClientV6.Collect(ctx6)
}
if stream6 != nil {
if err = stream6.Send(segment.segment); err != nil {
if err == io.EOF {
break
}
fmt.Println(err)
}
} else {
fmt.Println("stream not open")
} }
fmt.Println(err)
} }
} else {
fmt.Println("stream not open")
} }
if stream5 != nil { } else if segment.Version == 6 {
_, err = stream5.CloseAndRecv() if stream6 == nil {
if err != nil { stream6, err = t.segmentClientV6.Collect(ctx6)
log.Println(err)
}
} }
if stream6 != nil { if stream6 != nil {
_, err = stream6.CloseAndRecv() if err = stream6.Send(segment.segment); err != nil {
if err != nil { if err == io.EOF {
log.Println(err) break
}
fmt.Println(err)
} }
} else {
fmt.Println("stream not open")
} }
}
}
cancel() if stream5 != nil {
cancel6() _, err = stream5.CloseAndRecv()
if err != nil {
fmt.Println(err)
}
}
if stream6 != nil {
_, err = stream6.CloseAndRecv()
if err != nil {
fmt.Println(err)
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册