未验证 提交 03a0d515 编写于 作者: L limfriend 提交者: GitHub

Add 'Authentication' for grpc request (#60)

上级 fdb185d6
......@@ -30,12 +30,14 @@ import (
managementv3 "github.com/SkyAPM/go2sky/reporter/grpc/management"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"
)
const (
maxSendQueueSize int32 = 30000
defaultCheckInterval = 20 * time.Second
defaultLogPrefix = "go2sky-gRPC"
authKey = "Authentication"
)
// NewGRPCReporter create a new reporter to send data to gRPC oap server. Only one backend address is allowed.
......@@ -83,6 +85,12 @@ func WithInstanceProps(props map[string]string) GRPCReporterOption {
}
}
func WithAuthentication(auth string) GRPCReporterOption {
return func(r *gRPCReporter) {
r.md = metadata.New( map[string]string{authKey: auth})
}
}
type gRPCReporter struct {
service string
serviceInstance string
......@@ -93,6 +101,7 @@ type gRPCReporter struct {
traceClient agentv3.TraceSegmentReportServiceClient
managementClient managementv3.ManagementServiceClient
checkInterval time.Duration
md metadata.MD;
}
func (r *gRPCReporter) Boot(service string, serviceInstance string) {
......@@ -194,7 +203,7 @@ func (r *gRPCReporter) initSendPipeline() {
go func() {
StreamLoop:
for {
stream, err := r.traceClient.Collect(context.Background())
stream, err := r.traceClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md))
if err != nil {
r.logger.Printf("open stream error %v", err)
time.Sleep(5 * time.Second)
......@@ -232,7 +241,7 @@ func (r *gRPCReporter) reportInstanceProperties() (err error) {
})
}
}
_, err = r.managementClient.ReportInstanceProperties(context.Background(), &managementv3.InstanceProperties{
_, err = r.managementClient.ReportInstanceProperties(metadata.NewOutgoingContext(context.Background(), r.md), &managementv3.InstanceProperties{
Service: r.service,
ServiceInstance: r.serviceInstance,
Properties: props,
......@@ -261,7 +270,7 @@ func (r *gRPCReporter) check() {
instancePropertiesSubmitted = true
}
_, err := r.managementClient.KeepAlive(context.Background(), &managementv3.InstancePingPkg{
_, err := r.managementClient.KeepAlive(metadata.NewOutgoingContext(context.Background(), r.md), &managementv3.InstancePingPkg{
Service: r.service,
ServiceInstance: r.serviceInstance,
})
......
......@@ -166,6 +166,15 @@ func TestGRPCReporterOption(t *testing.T) {
}
},
},
{
name: "with auth",
option: WithAuthentication("test"),
verifyFunc: func(t *testing.T, reporter *gRPCReporter) {
if reporter.md.Get(authKey)[0] != "test" {
t.Error("error are not set Authentication")
}
},
},
}
for _, tt := range tests {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册