提交 fc3b440a 编写于 作者: G Gao Hongtao

Add handler to parse istio telemetry data.

上级 43ccfa7e
Subproject commit 33b132bffaabacbd003eec41b498d2810f386161
Subproject commit 75c74186a1548657013a299f388e6e8b7b4b5251
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.istio.telemetry.provider;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.istio.HandleMetricServiceGrpc;
import io.istio.IstioMetricProto;
import io.istio.api.mixer.adapter.model.v1beta1.ReportProto;
import io.istio.api.policy.v1beta1.TypeProto;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import org.apache.skywalking.apm.network.servicemesh.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.Protocol;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Handle istio telemetry data.
*
* @author gaohongtao
*/
public class IstioTelemetryGRPCHandler extends HandleMetricServiceGrpc.HandleMetricServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(IstioTelemetryGRPCHandler.class);
@Override public void handleMetric(IstioMetricProto.HandleMetricRequest request,
StreamObserver<ReportProto.ReportResult> responseObserver) {
for (IstioMetricProto.InstanceMsg i : request.getInstancesList()) {
logger.debug("Received msg {}", request);
String requestMethod = string(i, "requestMethod");
String requestPath = string(i,"requestPath");
String requestScheme = string(i,"requestScheme");
long responseCode = int64(i, "responseCode");
String reporter = string(i, "reporter");
String protocol = string(i, "apiProtocol");
String endpoint;
boolean status = true;
Protocol netProtocol;
if (protocol.equals("http") || protocol.equals("https")) {
endpoint = requestScheme + "/" + requestMethod + "/" + requestPath;
status = responseCode >= 200 && responseCode < 400;
netProtocol = Protocol.HTTP;
} else {
//grpc
endpoint = protocol + "/" + requestPath;
netProtocol = Protocol.gRPC;
}
Instant requestTime = time(i, "requestTime");
Instant responseTime = time(i, "responseTime");
int latency = Math.toIntExact(Duration.between(requestTime, responseTime).toMillis());
DetectPoint detectPoint;
if (reporter.equals("source")) {
detectPoint = DetectPoint.client;
} else {
detectPoint = DetectPoint.server;
}
ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(requestTime.toEpochMilli())
.setEndTime(responseTime.toEpochMilli()).setSourceServiceName(string(i, "sourceService"))
.setSourceServiceInstance(string(i, "sourceUID")).setDestServiceName(string(i, "destinationService"))
.setDestServiceInstance(string(i, "destinationUID")).setEndpoint(endpoint).setLatency(latency)
.setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(netProtocol).setDetectPoint(detectPoint).build();
logger.debug("Transformed metric {}", metric);
}
responseObserver.onNext(ReportProto.ReportResult.newBuilder().build());
responseObserver.onCompleted();
}
private String string(final IstioMetricProto.InstanceMsg instanceMsg, final String key) {
Map<String, TypeProto.Value> map = instanceMsg.getDimensionsMap();
assertDimension(map, key);
return map.get(key).getStringValue();
}
private long int64(final IstioMetricProto.InstanceMsg instanceMsg, final String key) {
Map<String, TypeProto.Value> map = instanceMsg.getDimensionsMap();
assertDimension(map, key);
return map.get(key).getInt64Value();
}
private Instant time(final IstioMetricProto.InstanceMsg instanceMsg, final String key) {
Map<String, TypeProto.Value> map = instanceMsg.getDimensionsMap();
assertDimension(map, key);
Timestamp timestamp = map.get(key).getTimestampValue().getValue();
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
}
private void assertDimension(final Map<String, TypeProto.Value> map, final String key) {
if (!map.containsKey(key)) {
throw new IllegalArgumentException(String.format("Lack dimension %s", key));
}
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.istio.telemetry.provider;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
......@@ -43,6 +44,8 @@ public class IstioTelemetryReceiverProvider extends ModuleProvider {
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
GRPCHandlerRegister service = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
service.addHandler(new IstioTelemetryGRPCHandler());
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
......
......@@ -54,6 +54,9 @@ import "mixer/adapter/model/v1beta1/report.proto";
import "policy/v1beta1/value_type.proto";
import "policy/v1beta1/type.proto";
option java_package = "io.istio";
option java_outer_classname = "IstioMetricProto";
option (istio.mixer.adapter.model.v1beta1.template_variety) = TEMPLATE_VARIETY_REPORT;
option (istio.mixer.adapter.model.v1beta1.template_name) = "metric";
......
......@@ -17,6 +17,8 @@ syntax = "proto3";
package istio.mixer.adapter.model.v1beta1;
option go_package="istio.io/api/mixer/adapter/model/v1beta1";
option java_package = "io.istio.api.mixer.adapter.model.v1beta1";
option java_outer_classname = "ExtensionsProto";
import "google/protobuf/descriptor.proto";
......
......@@ -17,6 +17,8 @@ syntax = "proto3";
package istio.mixer.adapter.model.v1beta1;
option go_package="istio.io/api/mixer/adapter/model/v1beta1";
option java_package = "io.istio.api.mixer.adapter.model.v1beta1";
option java_outer_classname = "ReportProto";
import "gogoproto/gogo.proto";
......
......@@ -22,6 +22,8 @@ syntax = "proto3";
package istio.policy.v1beta1;
option go_package="istio.io/api/policy/v1beta1";
option java_package = "io.istio.api.policy.v1beta1";
option java_outer_classname = "TypeProto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
......
......@@ -17,6 +17,8 @@ syntax = "proto3";
package istio.policy.v1beta1;
option go_package="istio.io/api/policy/v1beta1";
option java_package = "io.istio.api.policy.v1beta1";
option java_outer_classname = "ValueTypeProto";
// ValueType describes the types that values in the Istio system can take. These
// are used to describe the type of Attributes at run time, describe the type of
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.istio.telemetry.provider;
import org.junit.Test;
import static org.junit.Assert.*;
public class IstioTelemetryGRPCHandlerTest {
@Test
public void handleMetric() {
}
}
\ No newline at end of file
......@@ -51,6 +51,8 @@ storage:
monthMetricDataTTL: 18 # Unit is month
service-mesh:
default:
istio-telemetry:
default:
query:
graphql:
path: /graphql
......@@ -28,6 +28,7 @@
<logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
<logger name="io.grpc.netty" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册