diff --git a/reporter/grpc.go b/reporter/grpc.go index 14368909bd9128f9a3ba0cb9b3c70d8f10c7bb67..c613f3ad2ebd744dc0f76453b5a6cce093aca7a9 100644 --- a/reporter/grpc.go +++ b/reporter/grpc.go @@ -30,12 +30,14 @@ import ( managementv3 "github.com/SkyAPM/go2sky/reporter/grpc/management" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/metadata" ) const ( maxSendQueueSize int32 = 30000 defaultCheckInterval = 20 * time.Second defaultLogPrefix = "go2sky-gRPC" + authKey = "Authentication" ) // NewGRPCReporter create a new reporter to send data to gRPC oap server. Only one backend address is allowed. @@ -83,6 +85,12 @@ func WithInstanceProps(props map[string]string) GRPCReporterOption { } } +func WithAuthentication(auth string) GRPCReporterOption { + return func(r *gRPCReporter) { + r.md = metadata.New( map[string]string{authKey: auth}) + } +} + type gRPCReporter struct { service string serviceInstance string @@ -93,6 +101,7 @@ type gRPCReporter struct { traceClient agentv3.TraceSegmentReportServiceClient managementClient managementv3.ManagementServiceClient checkInterval time.Duration + md metadata.MD; } func (r *gRPCReporter) Boot(service string, serviceInstance string) { @@ -194,7 +203,7 @@ func (r *gRPCReporter) initSendPipeline() { go func() { StreamLoop: for { - stream, err := r.traceClient.Collect(context.Background()) + stream, err := r.traceClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md)) if err != nil { r.logger.Printf("open stream error %v", err) time.Sleep(5 * time.Second) @@ -232,7 +241,7 @@ func (r *gRPCReporter) reportInstanceProperties() (err error) { }) } } - _, err = r.managementClient.ReportInstanceProperties(context.Background(), &managementv3.InstanceProperties{ + _, err = r.managementClient.ReportInstanceProperties(metadata.NewOutgoingContext(context.Background(), r.md), &managementv3.InstanceProperties{ Service: r.service, ServiceInstance: r.serviceInstance, Properties: props, @@ -261,7 +270,7 @@ func (r *gRPCReporter) check() { instancePropertiesSubmitted = true } - _, err := r.managementClient.KeepAlive(context.Background(), &managementv3.InstancePingPkg{ + _, err := r.managementClient.KeepAlive(metadata.NewOutgoingContext(context.Background(), r.md), &managementv3.InstancePingPkg{ Service: r.service, ServiceInstance: r.serviceInstance, }) diff --git a/reporter/grpc_test.go b/reporter/grpc_test.go index 126711bf9c944f3dfd19e170b3abc78561dad00f..3a04ab32ade88bf5b55b979af2657858d58d4f3b 100644 --- a/reporter/grpc_test.go +++ b/reporter/grpc_test.go @@ -166,6 +166,15 @@ func TestGRPCReporterOption(t *testing.T) { } }, }, + { + name: "with auth", + option: WithAuthentication("test"), + verifyFunc: func(t *testing.T, reporter *gRPCReporter) { + if reporter.md.Get(authKey)[0] != "test" { + t.Error("error are not set Authentication") + } + }, + }, } for _, tt := range tests {