未验证 提交 d6a7fff1 编写于 作者: G Gao Hongtao 提交者: GitHub

Send local span (#4)

* Add span context that links all spans in a local trace context

* Implement Span
上级 f42a3acb
......@@ -27,6 +27,16 @@ proto-gen:
cd $(GRPC_PATH) && \
protoc register/*.proto --go_out=plugins=grpc:$(GOPATH)/src
.PHONY: mock-gen
mock-gen:
cd $(GRPC_PATH)/register && \
mkdir -p mock_register && \
mockgen github.com/tetratelabs/go2sky/reporter/grpc/register RegisterClient > mock_register/Register.mock.go && \
mockgen github.com/tetratelabs/go2sky/reporter/grpc/register ServiceInstancePingClient > mock_register/InstancePing.mock.go
cd $(GRPC_PATH)/language-agent-v2 && \
mkdir -p mock_trace && \
mockgen github.com/tetratelabs/go2sky/reporter/grpc/language-agent-v2 TraceSegmentReportServiceClient > mock_trace/trace.mock.go
.PHONY: all
all: vet lint test
......
package go2sky_test
import (
"context"
"github.com/tetratelabs/go2sky"
"github.com/tetratelabs/go2sky/reporter"
"log"
"time"
)
func ExampleNewTracer() {
r, err := reporter.NewGRPCReporter("localhost:11800")
if err != nil {
log.Fatalf("new reporter error %v \n", err)
}
defer r.Close()
tracer, err := go2sky.NewTracer("example", go2sky.WithReporter(r))
if err != nil {
log.Fatalf("create tracer error %v \n", err)
}
span, ctx, err := tracer.CreateLocalSpan(context.Background())
if err != nil {
log.Fatalf("create new local span error %v \n", err)
}
span.SetOperationName("invoke data")
span.Tag("kind", "outer")
time.Sleep(2 * time.Second)
subSpan, _, err := tracer.CreateLocalSpan(ctx)
if err != nil {
log.Fatalf("create new sub local span error %v \n", err)
}
subSpan.SetOperationName("invoke inner")
subSpan.Log(time.Now(), "inner", "this is right")
time.Sleep(2 * time.Second)
subSpan.End()
time.Sleep(1 * time.Second)
span.End()
time.Sleep(time.Minute)
// fmt.Print("aa")
// Output: aa
}
......@@ -3,6 +3,9 @@ module github.com/tetratelabs/go2sky
go 1.12
require (
github.com/golang/mock v1.2.0
github.com/golang/protobuf v1.3.1
github.com/google/uuid v1.1.1
github.com/pkg/errors v0.8.1 // indirect
google.golang.org/grpc v1.19.1
)
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522 h1:Ve1ORMCxvRmSXBwJK+t3Oy+V2vRW2OetUQBq4rJIkZE=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
......
package pkg
import (
"math/rand"
"sync"
"time"
)
var (
seededIDGen = rand.New(rand.NewSource(time.Now().UnixNano()))
seededIDLock sync.Mutex
)
func generateID() int64 {
seededIDLock.Lock()
defer seededIDLock.Unlock()
return seededIDGen.Int63()
}
// GenerateGlobalID generates global unique id
func GenerateGlobalID() []int64 {
return []int64{
time.Now().UnixNano(),
0,
generateID(),
}
}
// GenerateScopedGlobalID generates global unique id with a scopeId prefix
func GenerateScopedGlobalID(scopeID int64) []int64 {
return []int64{
scopeID,
time.Now().UnixNano(),
generateID(),
}
}
package pkg
import "time"
// Millisecond converts time to unix millisecond
func Millisecond(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}
package propagation
//CarrierItem is sub entity of propagation specification
type CarrierItem interface {
HeadKey() string
HeadValue() string
SetValue(t string)
IsValid() bool
}
type sw3CarrierItem struct {
}
func (s *sw3CarrierItem) HeadKey() string {
return "sw3"
}
import (
"errors"
)
func (s *sw3CarrierItem) HeadValue() string {
return ""
}
var (
errEmptyHeader = errors.New("empty header")
)
func (s *sw3CarrierItem) SetValue(t string) {
// DownstreamContext define the trace context from downstream
type DownstreamContext interface {
Header() string
}
func (s *sw3CarrierItem) IsValid() bool {
return true
}
//NewSW3CarrierItem create a new SkyWalking v3 propagation protocol carrier object
func NewSW3CarrierItem() CarrierItem {
item := new(sw3CarrierItem)
return item
}
// ContextCarrier is a data carrier of tracing context,
// it holds a snapshot for across process propagation.
type ContextCarrier struct {
items []CarrierItem
}
// GetAllItems gets all data from ContextCarrier
func (c *ContextCarrier) GetAllItems() []CarrierItem {
return c.items
}
// NewContextCarrier create a new ContextCarrier object
func NewContextCarrier() *ContextCarrier {
carrier := ContextCarrier{items: []CarrierItem{
NewSW3CarrierItem(),
}}
return &carrier
// UpstreamContext define the trace context to upstream
type UpstreamContext interface {
SetHeader(header string)
}
// Extractor is a tool specification which define how to
// extract trace parent context from propagation context
type Extractor func() (ContextCarrier, error)
type Extractor func() (DownstreamContext, error)
// Injector is a tool specification which define how to
// inject trace context into propagation context
type Injector func(carrier *ContextCarrier) error
type Injector func(carrier UpstreamContext) error
// TraceContext defines propagation specification of SkyWalking
type TraceContext struct {
sample int8
traceID []int64
parentSegmentID []int64
parentSpanID int32
parentServiceInstanceID int32
entryServiceInstanceID int32
}
// DecodeSW6 converts string header to TraceContext
func (tc *TraceContext) DecodeSW6(header string) error {
if header == "" {
return errEmptyHeader
}
return nil
}
......@@ -3,24 +3,36 @@ package reporter
import (
"context"
"errors"
"github.com/golang/protobuf/proto"
"log"
"os"
"time"
"google.golang.org/grpc"
"github.com/tetratelabs/go2sky"
"github.com/tetratelabs/go2sky/pkg"
"github.com/tetratelabs/go2sky/reporter/grpc/common"
v2 "github.com/tetratelabs/go2sky/reporter/grpc/language-agent-v2"
"github.com/tetratelabs/go2sky/reporter/grpc/register"
)
const (
maxSendQueueSize int32 = 30000
defaultPingInterval = 20 * time.Second
)
var (
errRegister = errors.New("fail to register reporter")
errServiceRegister = errors.New("fail to register service")
errInstanceRegister = errors.New("fail to instance service")
)
// NewGRPCReporter create a new reporter to send data to gRPC oap server
func NewGRPCReporter(serverAddr string, opts ...GRPCReporterOption) (go2sky.Reporter, error) {
r := &gRPCReporter{
conn: &grpc.ClientConn{},
logger: log.New(os.Stderr, "go2sky", log.LstdFlags),
logger: log.New(os.Stderr, "go2sky", log.LstdFlags),
sendCh: make(chan *common.UpstreamSegment, maxSendQueueSize),
pingInterval: defaultPingInterval,
}
for _, o := range opts {
o(r)
......@@ -30,6 +42,9 @@ func NewGRPCReporter(serverAddr string, opts ...GRPCReporterOption) (go2sky.Repo
return nil, err
}
r.conn = conn
r.registerClient = register.NewRegisterClient(conn)
r.pingClient = register.NewServiceInstancePingClient(conn)
r.traceClient = v2.NewTraceSegmentReportServiceClient(r.conn)
return r, nil
}
......@@ -44,23 +59,52 @@ func WithLogger(logger *log.Logger) GRPCReporterOption {
}
}
// WithPingInterval setup ping interval
func WithPingInterval(interval time.Duration) GRPCReporterOption {
return func(r *gRPCReporter) {
r.pingInterval = interval
}
}
type gRPCReporter struct {
conn *grpc.ClientConn
serviceID int32
instanceID int32
logger *log.Logger
serviceID int32
instanceID int32
instanceName string
logger *log.Logger
sendCh chan *common.UpstreamSegment
registerClient register.RegisterClient
conn *grpc.ClientConn
traceClient v2.TraceSegmentReportServiceClient
pingClient register.ServiceInstancePingClient
pingInterval time.Duration
}
func (r *gRPCReporter) Register(service string, instance string) error {
err := r.registerService(service)
if err != nil {
return err
func (r *gRPCReporter) Register(service string, instance string) (int32, int32, error) {
r.retryRegister(func() error {
return r.registerService(service)
})
r.retryRegister(func() error {
return r.registerInstance(instance)
})
r.initSendPipeline()
r.ping()
return r.serviceID, r.instanceID, nil
}
type retryFunction func() error
func (r *gRPCReporter) retryRegister(f retryFunction) {
for {
err := f()
if err == nil {
break
}
r.logger.Printf("register error %v \n", err)
time.Sleep(time.Second)
}
return r.registerInstance(instance)
}
func (r *gRPCReporter) registerService(name string) error {
client := register.NewRegisterClient(r.conn)
in := &register.Services{
Services: []*register.Service{
{
......@@ -68,47 +112,158 @@ func (r *gRPCReporter) registerService(name string) error {
},
},
}
mapping, err := client.DoServiceRegister(context.Background(), in)
mapping, err := r.registerClient.DoServiceRegister(context.Background(), in)
if err != nil {
return err
}
if len(mapping.Services) < 1 {
return errRegister
return errServiceRegister
}
r.serviceID = mapping.Services[0].Value
r.logger.Printf("the id of service %s is %d", name, r.serviceID)
r.logger.Printf("the id of service '%s' is %d", name, r.serviceID)
return nil
}
func (r *gRPCReporter) registerInstance(name string) error {
client := register.NewRegisterClient(r.conn)
in := &register.ServiceInstances{
Instances: []*register.ServiceInstance{
{
ServiceId: r.serviceID,
InstanceUUID: name,
Time: pkg.Millisecond(time.Now()),
},
},
}
mapping, err := client.DoServiceInstanceRegister(context.Background(), in)
mapping, err := r.registerClient.DoServiceInstanceRegister(context.Background(), in)
if err != nil {
return err
}
if len(mapping.ServiceInstances) < 1 {
return errRegister
return errInstanceRegister
}
r.instanceID = mapping.ServiceInstances[0].Value
r.logger.Printf("the id of instance %s 's id is %d", name, r.serviceID)
r.instanceName = name
r.logger.Printf("the id of instance '%s' id is %d", name, r.instanceID)
return nil
}
func (r *gRPCReporter) Send(spans []go2sky.Span) {
func (r *gRPCReporter) Send(spans []go2sky.ReportedSpan) {
spanSize := len(spans)
if spanSize < 1 {
return
}
rootSpan := spans[spanSize-1]
segment := &common.UpstreamSegment{
GlobalTraceIds: []*common.UniqueId{
{
IdParts: rootSpan.Context().TraceID,
},
},
}
segmentObject := &v2.SegmentObject{
ServiceId: r.serviceID,
ServiceInstanceId: r.instanceID,
TraceSegmentId: &common.UniqueId{
IdParts: rootSpan.Context().SegmentID,
},
Spans: make([]*v2.SpanObjectV2, spanSize),
}
for i, s := range spans {
segmentObject.Spans[i] = &v2.SpanObjectV2{
SpanId: s.Context().SpanID,
ParentSpanId: s.Context().ParentSpanID,
StartTime: s.StartTime(),
EndTime: s.EndTime(),
OperationName: s.OperationName(),
Peer: s.Peer(),
SpanType: s.SpanType(),
SpanLayer: s.SpanLayer(),
IsError: s.IsError(),
Tags: s.Tags(),
Logs: s.Logs(),
}
srr := make([]*v2.SegmentReference, 0)
if i == 0 && s.Context().ParentSpanID > -1 {
srr = append(srr, &v2.SegmentReference{
ParentSpanId: s.Context().ParentSpanID,
ParentTraceSegmentId: &common.UniqueId{
IdParts: s.Context().ParentSegmentID,
},
ParentServiceInstanceId: r.instanceID,
})
}
}
b, err := proto.Marshal(segmentObject)
if err != nil {
log.Printf("marshal segment object err %v", err)
return
}
segment.Segment = b
select {
case r.sendCh <- segment:
default:
log.Printf("reach max send buffer")
}
}
func (r *gRPCReporter) Close() {
close(r.sendCh)
err := r.conn.Close()
if err != nil {
r.logger.Print(err)
}
}
func (r *gRPCReporter) initSendPipeline() {
if r.traceClient == nil {
return
}
go func() {
StreamLoop:
for {
stream, err := r.traceClient.Collect(context.Background())
for {
select {
case s, ok := <-r.sendCh:
if !ok {
r.closeStream(stream)
return
}
err = stream.Send(s)
if err != nil {
r.logger.Printf("send segment error %v", err)
r.closeStream(stream)
continue StreamLoop
}
}
}
}
}()
}
func (r *gRPCReporter) closeStream(stream v2.TraceSegmentReportService_CollectClient) {
err := stream.CloseSend()
if err != nil {
r.logger.Printf("send closing error %v", err)
}
}
func (r *gRPCReporter) ping() {
if r.pingInterval < 0 || r.pingClient == nil {
return
}
go func() {
for {
_, err := r.pingClient.DoPing(context.Background(), &register.ServiceInstancePingPkg{
Time: pkg.Millisecond(time.Now()),
ServiceInstanceId: r.instanceID,
ServiceInstanceUUID: r.instanceName,
})
if err != nil {
r.logger.Printf("pinging error %v", err)
}
time.Sleep(r.pingInterval)
}
}()
}
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/tetratelabs/go2sky/reporter/grpc/language-agent-v2 (interfaces: TraceSegmentReportServiceClient)
// Package mock_language_agent_v2 is a generated GoMock package.
package mock_language_agent_v2
import (
context "context"
gomock "github.com/golang/mock/gomock"
language_agent_v2 "github.com/tetratelabs/go2sky/reporter/grpc/language-agent-v2"
grpc "google.golang.org/grpc"
reflect "reflect"
)
// MockTraceSegmentReportServiceClient is a mock of TraceSegmentReportServiceClient interface
type MockTraceSegmentReportServiceClient struct {
ctrl *gomock.Controller
recorder *MockTraceSegmentReportServiceClientMockRecorder
}
// MockTraceSegmentReportServiceClientMockRecorder is the mock recorder for MockTraceSegmentReportServiceClient
type MockTraceSegmentReportServiceClientMockRecorder struct {
mock *MockTraceSegmentReportServiceClient
}
// NewMockTraceSegmentReportServiceClient creates a new mock instance
func NewMockTraceSegmentReportServiceClient(ctrl *gomock.Controller) *MockTraceSegmentReportServiceClient {
mock := &MockTraceSegmentReportServiceClient{ctrl: ctrl}
mock.recorder = &MockTraceSegmentReportServiceClientMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockTraceSegmentReportServiceClient) EXPECT() *MockTraceSegmentReportServiceClientMockRecorder {
return m.recorder
}
// Collect mocks base method
func (m *MockTraceSegmentReportServiceClient) Collect(arg0 context.Context, arg1 ...grpc.CallOption) (language_agent_v2.TraceSegmentReportService_CollectClient, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0}
for _, a := range arg1 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Collect", varargs...)
ret0, _ := ret[0].(language_agent_v2.TraceSegmentReportService_CollectClient)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Collect indicates an expected call of Collect
func (mr *MockTraceSegmentReportServiceClientMockRecorder) Collect(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0}, arg1...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Collect", reflect.TypeOf((*MockTraceSegmentReportServiceClient)(nil).Collect), varargs...)
}
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/tetratelabs/go2sky/reporter/grpc/register (interfaces: ServiceInstancePingClient)
// Package mock_register is a generated GoMock package.
package mock_register
import (
context "context"
gomock "github.com/golang/mock/gomock"
common "github.com/tetratelabs/go2sky/reporter/grpc/common"
register "github.com/tetratelabs/go2sky/reporter/grpc/register"
grpc "google.golang.org/grpc"
reflect "reflect"
)
// MockServiceInstancePingClient is a mock of ServiceInstancePingClient interface
type MockServiceInstancePingClient struct {
ctrl *gomock.Controller
recorder *MockServiceInstancePingClientMockRecorder
}
// MockServiceInstancePingClientMockRecorder is the mock recorder for MockServiceInstancePingClient
type MockServiceInstancePingClientMockRecorder struct {
mock *MockServiceInstancePingClient
}
// NewMockServiceInstancePingClient creates a new mock instance
func NewMockServiceInstancePingClient(ctrl *gomock.Controller) *MockServiceInstancePingClient {
mock := &MockServiceInstancePingClient{ctrl: ctrl}
mock.recorder = &MockServiceInstancePingClientMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockServiceInstancePingClient) EXPECT() *MockServiceInstancePingClientMockRecorder {
return m.recorder
}
// DoPing mocks base method
func (m *MockServiceInstancePingClient) DoPing(arg0 context.Context, arg1 *register.ServiceInstancePingPkg, arg2 ...grpc.CallOption) (*common.Commands, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DoPing", varargs...)
ret0, _ := ret[0].(*common.Commands)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DoPing indicates an expected call of DoPing
func (mr *MockServiceInstancePingClientMockRecorder) DoPing(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoPing", reflect.TypeOf((*MockServiceInstancePingClient)(nil).DoPing), varargs...)
}
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/tetratelabs/go2sky/reporter/grpc/register (interfaces: RegisterClient)
// Package mock_register is a generated GoMock package.
package mock_register
import (
context "context"
gomock "github.com/golang/mock/gomock"
common "github.com/tetratelabs/go2sky/reporter/grpc/common"
register "github.com/tetratelabs/go2sky/reporter/grpc/register"
grpc "google.golang.org/grpc"
reflect "reflect"
)
// MockRegisterClient is a mock of RegisterClient interface
type MockRegisterClient struct {
ctrl *gomock.Controller
recorder *MockRegisterClientMockRecorder
}
// MockRegisterClientMockRecorder is the mock recorder for MockRegisterClient
type MockRegisterClientMockRecorder struct {
mock *MockRegisterClient
}
// NewMockRegisterClient creates a new mock instance
func NewMockRegisterClient(ctrl *gomock.Controller) *MockRegisterClient {
mock := &MockRegisterClient{ctrl: ctrl}
mock.recorder = &MockRegisterClientMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockRegisterClient) EXPECT() *MockRegisterClientMockRecorder {
return m.recorder
}
// DoEndpointRegister mocks base method
func (m *MockRegisterClient) DoEndpointRegister(arg0 context.Context, arg1 *register.Enpoints, arg2 ...grpc.CallOption) (*register.EndpointMapping, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DoEndpointRegister", varargs...)
ret0, _ := ret[0].(*register.EndpointMapping)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DoEndpointRegister indicates an expected call of DoEndpointRegister
func (mr *MockRegisterClientMockRecorder) DoEndpointRegister(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoEndpointRegister", reflect.TypeOf((*MockRegisterClient)(nil).DoEndpointRegister), varargs...)
}
// DoNetworkAddressRegister mocks base method
func (m *MockRegisterClient) DoNetworkAddressRegister(arg0 context.Context, arg1 *register.NetAddresses, arg2 ...grpc.CallOption) (*register.NetAddressMapping, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DoNetworkAddressRegister", varargs...)
ret0, _ := ret[0].(*register.NetAddressMapping)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DoNetworkAddressRegister indicates an expected call of DoNetworkAddressRegister
func (mr *MockRegisterClientMockRecorder) DoNetworkAddressRegister(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoNetworkAddressRegister", reflect.TypeOf((*MockRegisterClient)(nil).DoNetworkAddressRegister), varargs...)
}
// DoServiceAndNetworkAddressMappingRegister mocks base method
func (m *MockRegisterClient) DoServiceAndNetworkAddressMappingRegister(arg0 context.Context, arg1 *register.ServiceAndNetworkAddressMappings, arg2 ...grpc.CallOption) (*common.Commands, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DoServiceAndNetworkAddressMappingRegister", varargs...)
ret0, _ := ret[0].(*common.Commands)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DoServiceAndNetworkAddressMappingRegister indicates an expected call of DoServiceAndNetworkAddressMappingRegister
func (mr *MockRegisterClientMockRecorder) DoServiceAndNetworkAddressMappingRegister(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoServiceAndNetworkAddressMappingRegister", reflect.TypeOf((*MockRegisterClient)(nil).DoServiceAndNetworkAddressMappingRegister), varargs...)
}
// DoServiceInstanceRegister mocks base method
func (m *MockRegisterClient) DoServiceInstanceRegister(arg0 context.Context, arg1 *register.ServiceInstances, arg2 ...grpc.CallOption) (*register.ServiceInstanceRegisterMapping, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DoServiceInstanceRegister", varargs...)
ret0, _ := ret[0].(*register.ServiceInstanceRegisterMapping)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DoServiceInstanceRegister indicates an expected call of DoServiceInstanceRegister
func (mr *MockRegisterClientMockRecorder) DoServiceInstanceRegister(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoServiceInstanceRegister", reflect.TypeOf((*MockRegisterClient)(nil).DoServiceInstanceRegister), varargs...)
}
// DoServiceRegister mocks base method
func (m *MockRegisterClient) DoServiceRegister(arg0 context.Context, arg1 *register.Services, arg2 ...grpc.CallOption) (*register.ServiceRegisterMapping, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DoServiceRegister", varargs...)
ret0, _ := ret[0].(*register.ServiceRegisterMapping)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DoServiceRegister indicates an expected call of DoServiceRegister
func (mr *MockRegisterClientMockRecorder) DoServiceRegister(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoServiceRegister", reflect.TypeOf((*MockRegisterClient)(nil).DoServiceRegister), varargs...)
}
package reporter
import (
"fmt"
"github.com/golang/mock/gomock"
"github.com/tetratelabs/go2sky/reporter/grpc/common"
"github.com/tetratelabs/go2sky/reporter/grpc/register"
"github.com/tetratelabs/go2sky/reporter/grpc/register/mock_register"
"log"
"math/rand"
"os"
"testing"
)
func Test_gRPCReporter_Register(t *testing.T) {
ctrl := gomock.NewController(t)
mockRegisterClient := mock_register.NewMockRegisterClient(ctrl)
reporter := &gRPCReporter{
registerClient: mockRegisterClient,
logger: log.New(os.Stderr, "go2sky", log.LstdFlags),
}
serviceID := rand.Int31()
serviceName := fmt.Sprintf("service-%d", serviceID)
mockRegisterClient.EXPECT().DoServiceRegister(
gomock.Any(),
gomock.Any(),
).Return(&register.ServiceRegisterMapping{Services: []*common.KeyIntValuePair{{
Value: serviceID,
Key: serviceName,
}}}, nil)
instanceID := rand.Int31()
instanceName := fmt.Sprintf("instance-%d", instanceID)
mockRegisterClient.EXPECT().DoServiceInstanceRegister(
gomock.Any(),
gomock.Any(),
).Return(&register.ServiceInstanceRegisterMapping{ServiceInstances: []*common.KeyIntValuePair{{
Value: instanceID,
Key: instanceName,
}}}, nil)
aServiceID, aInstanceID, err := reporter.Register(serviceName, instanceName)
if err != nil || serviceID != aServiceID || instanceID != aInstanceID {
t.Errorf("register service and instance error")
}
}
package go2sky
import "sync/atomic"
import (
"github.com/tetratelabs/go2sky/pkg"
"sync/atomic"
)
func newSegmentSpan(defaultSpan *defaultSpan, parentSpan Span) Span {
s := &segmentSpanImpl{
defaultSpan: *defaultSpan,
defaultSpan: *defaultSpan,
segmentContext: &segmentContext{},
}
if parentSpan == nil {
......@@ -13,10 +16,10 @@ func newSegmentSpan(defaultSpan *defaultSpan, parentSpan Span) Span {
if rootSpan, ok := parentSpan.(segmentSpan); ok {
if rootSpan.segmentRegister() {
s.segmentContext = rootSpan.context()
s.sc.SpanID = atomic.AddInt32(s.SpanIDGenerator, 1)
return s
}
return newSegmentRoot(s)
}
return newSegmentRoot(s)
}
......@@ -36,8 +39,9 @@ func (s *segmentSpanImpl) context() *segmentContext {
}
type segmentContext struct {
collect chan<- Span
refNum *int32
collect chan<- ReportedSpan
refNum *int32
SpanIDGenerator *int32
}
func (s *segmentSpanImpl) segmentRegister() bool {
......@@ -53,6 +57,7 @@ func (s *segmentSpanImpl) segmentRegister() bool {
}
func (s *segmentSpanImpl) End() {
s.defaultSpan.End()
go func() {
s.collect <- s
}()
......@@ -60,12 +65,13 @@ func (s *segmentSpanImpl) End() {
type rootSegmentSpan struct {
*segmentSpanImpl
notify <-chan Span
segment []Span
notify <-chan ReportedSpan
segment []ReportedSpan
doneCh chan int32
}
func (rs *rootSegmentSpan) End() {
rs.defaultSpan.End()
go func() {
rs.doneCh <- atomic.SwapInt32(rs.refNum, -1)
}()
......@@ -75,12 +81,17 @@ func newSegmentRoot(segmentSpan *segmentSpanImpl) *rootSegmentSpan {
s := &rootSegmentSpan{
segmentSpanImpl: segmentSpan,
}
s.sc.SegmentID = pkg.GenerateScopedGlobalID(int64(s.tracer.instanceID))
g := int32(0)
s.SpanIDGenerator = &g
s.sc.SpanID = g
s.sc.ParentSpanID = -1
var init int32
s.refNum = &init
ch := make(chan Span)
ch := make(chan ReportedSpan)
s.collect = ch
s.notify = ch
s.segment = make([]Span, 0, 10)
s.segment = make([]ReportedSpan, 0, 10)
s.doneCh = make(chan int32)
go func() {
total := -1
......
......@@ -86,16 +86,15 @@ func TestAsyncMultipleSegments(t *testing.T) {
}
}
func MockExtractor() (c propagation.ContextCarrier, e error) {
func MockExtractor() (c propagation.DownstreamContext, e error) {
return
}
func MockInjector(carrier *propagation.ContextCarrier) (e error) {
carrier.GetAllItems()
func MockInjector(carrier propagation.UpstreamContext) (e error) {
return
}
type Segment []Span
type Segment []ReportedSpan
type MockReporter struct {
Reporter
......@@ -104,11 +103,11 @@ type MockReporter struct {
sync.Mutex
}
func (r *MockReporter) Register(service string, instance string) error {
return nil
func (r *MockReporter) Register(service string, instance string) (int32, int32, error) {
return 0, 0, nil
}
func (r *MockReporter) Send(spans []Span) {
func (r *MockReporter) Send(spans []ReportedSpan) {
r.Mutex.Lock()
defer r.Mutex.Unlock()
r.Message = append(r.Message, spans)
......
package go2sky
import (
"math"
"time"
"github.com/tetratelabs/go2sky/pkg"
"github.com/tetratelabs/go2sky/propagation"
"github.com/tetratelabs/go2sky/reporter/grpc/common"
v2 "github.com/tetratelabs/go2sky/reporter/grpc/language-agent-v2"
)
// SpanType is used to identify entry, exit and local
type SpanType int32
const (
// SpanTypeEntry is a entry span, eg http server
SpanTypeEntry SpanType = 0
// SpanTypeExit is a exit span, eg http client
SpanTypeExit SpanType = 1
// SpanTypeLocal is a local span, eg local method invoke
SpanTypeLocal SpanType = 2
)
// BaseSpan is a base interface defines the common method sharding among different spans
type BaseSpan interface {
Context() SpanContext
}
// Span interface as common span specification
type Span interface {
BaseSpan
SetOperationName(string)
SetPeer(string)
SetSpanLayer(common.SpanLayer)
Tag(string, string)
Log(time.Time, ...string)
Error(time.Time, ...string)
End()
}
// ReportedSpan is accessed by Reporter to load reported data
type ReportedSpan interface {
BaseSpan
TraceContext() *propagation.TraceContext
StartTime() int64
EndTime() int64
OperationName() string
Peer() string
SpanType() common.SpanType
SpanLayer() common.SpanLayer
IsError() bool
Tags() []*common.KeyStringValuePair
Logs() []*v2.Log
}
func newSpanContext(parentSpan Span) SpanContext {
var sc SpanContext
if parentSpan == nil {
sc = SpanContext{}
sc.TraceID = pkg.GenerateGlobalID()
} else {
sc = parentSpan.Context()
sc.ParentSpanID = parentSpan.Context().SpanID
}
return sc
}
// SpanContext defines the relationship between spans in one trace
type SpanContext struct {
TraceID []int64
SegmentID []int64
SpanID int32
ParentSegmentID []int64
ParentSpanID int32
}
func newLocalSpan(t *Tracer) *defaultSpan {
return &defaultSpan{
tracer: t,
startTime: time.Now(),
spanType: SpanTypeLocal,
}
}
type defaultSpan struct {
tc *propagation.TraceContext
sc SpanContext
tracer *Tracer
startTime time.Time
endTime time.Time
operationName string
peer string
layer common.SpanLayer
tags []*common.KeyStringValuePair
logs []*v2.Log
isError bool
spanType SpanType
}
// For ReportedSpan
func (ds *defaultSpan) TraceContext() *propagation.TraceContext {
return ds.tc
}
func (ds *defaultSpan) StartTime() int64 {
return pkg.Millisecond(ds.startTime)
}
func (ds *defaultSpan) EndTime() int64 {
return pkg.Millisecond(ds.endTime)
}
func (ds *defaultSpan) OperationName() string {
return ds.operationName
}
func (ds *defaultSpan) Peer() string {
return ds.peer
}
func (ds *defaultSpan) SpanType() common.SpanType {
return common.SpanType(ds.spanType)
}
func (ds *defaultSpan) SpanLayer() common.SpanLayer {
return ds.layer
}
func (ds *defaultSpan) IsError() bool {
return ds.isError
}
func (ds *defaultSpan) Tags() []*common.KeyStringValuePair {
return ds.tags
}
func (ds *defaultSpan) Logs() []*v2.Log {
return ds.logs
}
// For Span
func (ds *defaultSpan) SetOperationName(name string) {
ds.operationName = name
}
func (ds *defaultSpan) SetPeer(peer string) {
ds.peer = peer
}
func (ds *defaultSpan) SetSpanLayer(layer common.SpanLayer) {
ds.layer = layer
}
func (ds *defaultSpan) Tag(key string, value string) {
ds.tags = append(ds.tags, &common.KeyStringValuePair{Key: key, Value: value})
}
func (ds *defaultSpan) Log(time time.Time, ll ...string) {
data := make([]*common.KeyStringValuePair, 0, int32(math.Ceil(float64(len(ll))/2.0)))
var kvp *common.KeyStringValuePair
for i, l := range ll {
if i%2 == 0 {
kvp = &common.KeyStringValuePair{}
data = append(data, kvp)
kvp.Key = l
} else {
kvp.Value = l
}
}
ds.logs = append(ds.logs, &v2.Log{Time: pkg.Millisecond(time), Data: data})
}
func (ds *defaultSpan) Error(time time.Time, ll ...string) {
ds.isError = true
ds.Log(time, ll...)
}
func (ds *defaultSpan) End() {
ds.endTime = time.Now()
}
func (ds *defaultSpan) Context() SpanContext {
return ds.sc
}
// SpanOption allows for functional options to adjust behaviour
// of a Span to be created by CreateLocalSpan
type SpanOption func(s *defaultSpan)
......@@ -2,9 +2,28 @@ package go2sky
import "github.com/tetratelabs/go2sky/propagation"
// WithParent setup parent context from propagation
func WithParent(cc propagation.ContextCarrier) SpanOption {
// WithDownstream setup trace sc from propagation
func WithDownstream(cc propagation.DownstreamContext) SpanOption {
return func(s *defaultSpan) {
s.ContextCarrier = cc
if cc == nil {
return
}
header := cc.Header()
if header == "" {
return
}
tc := &propagation.TraceContext{}
err := tc.DecodeSW6(cc.Header())
if err != nil {
return
}
s.tc = tc
}
}
// WithSpanType setup span type of a span
func WithSpanType(spanType SpanType) SpanOption {
return func(s *defaultSpan) {
s.spanType = spanType
}
}
......@@ -2,7 +2,7 @@ package go2sky
import (
"context"
"github.com/google/uuid"
"github.com/tetratelabs/go2sky/propagation"
)
......@@ -12,7 +12,9 @@ type Tracer struct {
instance string
reporter Reporter
// 0 not init 1 init
initFlag int32
initFlag int32
serviceID int32
instanceID int32
}
// TracerOption allows for functional options to adjust behaviour
......@@ -28,85 +30,68 @@ func NewTracer(service string, opts ...TracerOption) (tracer *Tracer, err error)
for _, opt := range opts {
opt(t)
}
if t.instance == "" {
id, err := uuid.NewUUID()
if err != nil {
return nil, err
}
t.instance = id.String()
}
if t.reporter != nil {
err := t.reporter.Register(t.service, t.instance)
serviceID, instanceID, err := t.reporter.Register(t.service, t.instance)
if err != nil {
return nil, err
}
t.initFlag = 1
t.serviceID = serviceID
t.instanceID = instanceID
}
return t, nil
}
// CreateEntrySpan creates and starts an entry span for incoming request
func (t *Tracer) CreateEntrySpan(ctx context.Context, extractor propagation.Extractor) (Span, context.Context, error) {
cc, err := extractor()
dc, err := extractor()
if err != nil {
return nil, nil, err
}
return t.CreateLocalSpan(ctx, WithParent(cc))
return t.CreateLocalSpan(ctx, WithDownstream(dc), WithSpanType(SpanTypeEntry))
}
// CreateLocalSpan creates and starts a span for local usage
func (t *Tracer) CreateLocalSpan(ctx context.Context, opts ...SpanOption) (s Span, c context.Context, err error) {
parentSpan, ok := ctx.Value(key).(Span)
if ok && parentSpan != nil {
opts = append(opts, WithParent(parentSpan.Context()))
}
ds := &defaultSpan{
tracer: t,
}
ds := newLocalSpan(t)
for _, opt := range opts {
opt(ds)
}
parentSpan, ok := ctx.Value(key).(Span)
if !ok {
parentSpan = nil
}
ds.sc = newSpanContext(parentSpan)
s = newSegmentSpan(ds, parentSpan)
return s, context.WithValue(ctx, key, s), nil
}
// CreateExitSpan creates and starts an exit span for client
func (t *Tracer) CreateExitSpan(ctx context.Context, injector propagation.Injector) (Span, error) {
s, _, err := t.CreateLocalSpan(ctx)
s, _, err := t.CreateLocalSpan(ctx, WithSpanType(SpanTypeExit))
if err != nil {
return nil, err
}
cc := s.Context()
err = injector(&cc)
if err != nil {
return nil, err
}
return s, nil
}
// Span interface as common span specification
type Span interface {
Context() propagation.ContextCarrier
End()
}
type defaultSpan struct {
propagation.ContextCarrier
tracer *Tracer
}
func (s *defaultSpan) Context() propagation.ContextCarrier {
return s.ContextCarrier
}
func (s *defaultSpan) End() {
}
// SpanOption allows for functional options to adjust behaviour
// of a Span to be created by CreateLocalSpan
type SpanOption func(s *defaultSpan)
type ctxKey struct{}
var key = ctxKey{}
//Reporter is a data transit specification
type Reporter interface {
Register(service string, instance string) error
Send(spans []Span)
Register(service string, instance string) (int32, int32, error)
Send(spans []ReportedSpan)
Close()
}
package go2sky
// WithReporter setup report pipeline for tracer
func WithReporter(reporter Reporter) TracerOption{
func WithReporter(reporter Reporter) TracerOption {
return func(t *Tracer) {
t.reporter = reporter
}
......
package go2sky
import (
"context"
"errors"
"reflect"
"testing"
)
......@@ -24,14 +26,82 @@ func TestTracerInit(t *testing.T) {
}
}
func TestTracer_CreateLocalSpan(t *testing.T) {
tracer, _ := NewTracer("", WithReporter(&mockRegisterReporter{
success: true,
}))
span, ctx, err := tracer.CreateLocalSpan(context.Background())
defer span.End()
if err != nil {
t.Error(err)
}
subSpan, _, err := tracer.CreateLocalSpan(ctx)
defer subSpan.End()
if err != nil {
t.Error(err)
}
verifySpans(t, span, subSpan)
}
func TestTracer_CreateLocalSpanAsync(t *testing.T) {
tracer, _ := NewTracer("", WithReporter(&mockRegisterReporter{
success: true,
}))
span, ctx, err := tracer.CreateLocalSpan(context.Background())
defer span.End()
if err != nil {
t.Error(err)
}
retCh := make(chan int32, 10)
defer close(retCh)
for i := 0; i < 10; i++ {
go func() {
subSpan, _, err := tracer.CreateLocalSpan(ctx)
defer subSpan.End()
if err != nil {
t.Error(err)
}
verifySpans(t, span, subSpan)
retCh <- subSpan.Context().SpanID
}()
}
m := map[int32]interface{}{}
for i := 0; i < 10; i++ {
select {
case a := <-retCh:
m[a] = 0
}
}
if len(m) != 10 {
t.Error("duplicated span id")
}
}
func verifySpans(t *testing.T, span Span, subSpan Span) {
if !reflect.DeepEqual(subSpan.Context().TraceID, span.Context().TraceID) {
t.Errorf("trace id is different %v %v", subSpan.Context().TraceID, span.Context().TraceID)
}
if subSpan.Context().ParentSpanID != span.Context().SpanID {
t.Errorf("span linking is wrong %d %d", subSpan.Context().ParentSpanID, span.Context().SpanID)
}
if subSpan.Context().SpanID == span.Context().SpanID {
t.Errorf("same span id %d", span.Context().SpanID)
}
}
type mockRegisterReporter struct {
Reporter
success bool
}
func (r *mockRegisterReporter) Register(service string, instance string) error {
func (r *mockRegisterReporter) Send(spans []ReportedSpan) {
}
func (r *mockRegisterReporter) Close() {
}
func (r *mockRegisterReporter) Register(service string, instance string) (int32, int32, error) {
if r.success {
return nil
return 1, 1, nil
}
return errRegister
return 0, 0, errRegister
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册