提交 0ecf641c 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Support gRPC metric exporter (#2395)

* Support grpc exporter.

* Support scope(all) metric export and finish document.

* Close exporter even in local debug env. And fix CI.

* Remove the system.out

* Fix absent 4.

* Support subscription from remote.

* Add the doc

* Update application.yml
上级 32c4bced
......@@ -220,7 +220,6 @@ The text of each license is the standard Apache 2.0 license.
proto files from envoyproxy/data-plane-api: https://github.com/envoyproxy/data-plane-api Apache 2.0
proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0
proto files from lyft/protoc-gen-validate: https://github.com/lyft/protoc-gen-validate Apache 2.0
proto files from
========================================================================
......
......@@ -31,6 +31,7 @@ For each official Apache release, there is a complete and independent source cod
* `grpc-java` and `java` folders in **oap-server/server-core/target/generated-sources/protobuf**
* `grpc-java` and `java` folders in **oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/target/generated-sources/protobuf**
* `grpc-java` and `java` folders in **oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/target/generated-sources/protobuf**
* `grpc-java` and `java` folders in **oap-server/exporter/target/generated-sources/protobuf**
* `antlr4` folder in **oap-server/generate-tool-grammar/target/generated-sources**
* `oal` folder in **oap-server/generated-analysis/target/generated-sources**
......
......@@ -77,6 +77,8 @@ which helps you to understand which metric data are in process, also could be us
rules targeting the analysis oal metric objects.
1. [Advanced deployment options](advanced-deployment.md). If you want to deploy backend in very large
scale and support high payload, you may need this.
1. [Metric exporter](metric-exporter.md). Use metric data exporter to forward metric data to 3rd party
system.
## Telemetry for backend
OAP backend cluster itself underlying is a distributed streaming process system. For helping the Ops team,
......
# Metric Exporter
SkyWalking provides basic and most important metric aggregation, alarm and analysis.
In real world, people may want to forward the data to their 3rd party system, for deeper analysis or anything else.
**Metric Exporter** makes that possible.
Metric exporter is an independent module, you need manually active it.
Right now, we provide the following exporters
1. gRPC exporter
## gRPC exporter
gRPC exporter uses SkyWalking native exporter service definition. Here is proto definition.
```proto
service MetricExportService {
rpc export (stream ExportMetricValue) returns (ExportResponse) {
}
rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
}
}
message ExportMetricValue {
string metricName = 1;
string entityName = 2;
string entityId = 3;
ValueType type = 4;
int64 timeBucket = 5;
int64 longValue = 6;
double doubleValue = 7;
}
message SubscriptionsResp {
repeated string metricNames = 1;
}
enum ValueType {
LONG = 0;
DOUBLE = 1;
}
message SubscriptionReq {
}
message ExportResponse {
}
```
To active the exporter, you should add this into your `application.yml`
```yaml
exporter:
grpc:
targetHost: 127.0.0.1
targetPort: 9870
```
- `targetHost`:`targetPort` is the expected target service address. You could set any gRPC server to receive the data.
- Target gRPC service needs to be standby, otherwise, the OAP starts up failure.
## For target exporter service
### subscription implementation
Return the expected metric name list, all the names must match the OAL script definition. Return empty list, if you want
to export all metrics.
### export implementation
Stream service, all subscribed metrics will be sent to here, based on OAP core schedule. Also, if the OAP deployed as cluster,
then this method will be called concurrently. For metric value, you need follow `#type` to choose `#longValue` or `#doubleValue`.
\ No newline at end of file
......@@ -36,4 +36,34 @@
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.8.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.exporter.provider;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
/**
* @author wusheng
*/
@Setter
public class MetricFormatter {
private ServiceInventoryCache serviceInventoryCache;
private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
protected String getEntityName(IndicatorMetaInfo meta) {
int scope = meta.getScope();
if (DefaultScopeDefine.inServiceCatalog(scope)) {
return serviceInventoryCache.get(scope).getName();
} else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) {
return serviceInstanceInventoryCache.get(scope).getName();
} else if (DefaultScopeDefine.inEndpointCatalog(scope)) {
return endpointInventoryCache.get(scope).getName();
} else if (scope == DefaultScopeDefine.ALL) {
return "";
} else {
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.exporter.grpc.*;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.slf4j.*;
/**
* @author wusheng
*/
public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<GRPCExporter.ExportData> {
private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);
private GRPCExporterSetting setting;
private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
private final DataCarrier exportBuffer;
private final Set<String> subscriptionSet;
public GRPCExporter(GRPCExporterSetting setting) {
this.setting = setting;
GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
client.connect();
ManagedChannel channel = client.getChannel();
exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
exportBuffer.consume(this, 1, 200);
subscriptionSet = new HashSet<>();
}
@Override public void export(IndicatorMetaInfo meta, Indicator indicator) {
if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getIndicatorName())) {
exportBuffer.produce(new ExportData(meta, indicator));
}
}
public void initSubscriptionList() {
SubscriptionsResp subscription = blockingStub.subscription(SubscriptionReq.newBuilder().build());
subscription.getMetricNamesList().forEach(subscriptionSet::add);
logger.debug("Get exporter subscription list, {}", subscriptionSet);
}
@Override public void init() {
}
@Override public void consume(List<ExportData> data) {
if (data.size() == 0) {
return;
}
ExportStatus status = new ExportStatus();
StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.export(
new StreamObserver<ExportResponse>() {
@Override public void onNext(ExportResponse response) {
}
@Override public void onError(Throwable throwable) {
status.done();
}
@Override public void onCompleted() {
status.done();
}
}
);
AtomicInteger exportNum = new AtomicInteger();
data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
Indicator indicator = row.getIndicator();
if (indicator instanceof LongValueHolder) {
long value = ((LongValueHolder)indicator).getValue();
builder.setLongValue(value);
builder.setType(ValueType.LONG);
} else if (indicator instanceof IntValueHolder) {
long value = ((IntValueHolder)indicator).getValue();
builder.setLongValue(value);
builder.setType(ValueType.LONG);
} else if (indicator instanceof DoubleValueHolder) {
double value = ((DoubleValueHolder)indicator).getValue();
builder.setDoubleValue(value);
builder.setType(ValueType.DOUBLE);
} else {
return;
}
IndicatorMetaInfo meta = row.getMeta();
builder.setMetricName(meta.getIndicatorName());
String entityName = getEntityName(meta);
if (entityName == null) {
return;
}
builder.setEntityName(entityName);
builder.setEntityId(meta.getId());
builder.setTimeBucket(indicator.getTimeBucket());
streamObserver.onNext(builder.build());
exportNum.getAndIncrement();
});
streamObserver.onCompleted();
long sleepTime = 0;
long cycle = 100L;
/**
* For memory safe of oap, we must wait for the peer confirmation.
*/
while (!status.isDone()) {
try {
sleepTime += cycle;
Thread.sleep(cycle);
} catch (InterruptedException e) {
}
if (sleepTime > 2000L) {
logger.warn("Export {} metric(s) to {}:{}, wait {} milliseconds.",
exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
cycle = 2000L;
}
}
logger.debug("Exported {} metric(s) to {}:{} in {} milliseconds.",
exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
}
@Override public void onError(List<ExportData> data, Throwable t) {
logger.error(t.getMessage(), t);
}
@Override public void onExit() {
}
@Getter(AccessLevel.PRIVATE)
public class ExportData {
private IndicatorMetaInfo meta;
private Indicator indicator;
public ExportData(IndicatorMetaInfo meta, Indicator indicator) {
this.meta = meta;
this.indicator = indicator;
}
}
private class ExportStatus {
private boolean done = false;
private void done() {
done = true;
}
public boolean isDone() {
return done;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.library.module.*;
/**
* @author wusheng
*/
public class GRPCExporterProvider extends ModuleProvider {
private GRPCExporterSetting setting;
private GRPCExporter exporter;
@Override public String name() {
return "grpc";
}
@Override public Class<? extends ModuleDefine> module() {
return ExporterModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
setting = new GRPCExporterSetting();
return setting;
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
exporter = new GRPCExporter(setting);
this.registerServiceImplementation(MetricValuesExportService.class, exporter);
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
exporter.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
exporter.setServiceInstanceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class));
exporter.setEndpointInventoryCache(getManager().find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class));
exporter.initSubscriptionList();
}
@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author wusheng
*/
@Setter
@Getter
public class GRPCExporterSetting extends ModuleConfig {
private String targetHost;
private int targetPort;
private int bufferChannelSize = 20000;
private int bufferChannelNum = 2;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.exporter.grpc";
service MetricExportService {
rpc export (stream ExportMetricValue) returns (ExportResponse) {
}
rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
}
}
message ExportMetricValue {
string metricName = 1;
string entityName = 2;
string entityId = 3;
ValueType type = 4;
int64 timeBucket = 5;
int64 longValue = 6;
double doubleValue = 7;
}
message SubscriptionsResp {
repeated string metricNames = 1;
}
enum ValueType {
LONG = 0;
DOUBLE = 1;
}
message SubscriptionReq {
}
message ExportResponse {
}
......@@ -15,3 +15,5 @@
# limitations under the License.
#
#
org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCExporterProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.oap.server.exporter.grpc.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.*;
public class ExporterMockReceiver {
public static void main(String[] args) throws ServerException, InterruptedException {
GRPCServer server = new GRPCServer("127.0.0.1", 9870);
server.initialize();
server.addHandler(new MockHandler());
server.start();
while (true) {
Thread.sleep(20000L);
}
}
public static class MockHandler extends MetricExportServiceGrpc.MetricExportServiceImplBase implements GRPCHandler {
@Override public StreamObserver<ExportMetricValue> export(StreamObserver<ExportResponse> responseObserver) {
return new StreamObserver<ExportMetricValue>() {
@Override public void onNext(ExportMetricValue value) {
}
@Override public void onError(Throwable throwable) {
responseObserver.onError(throwable);
}
@Override public void onCompleted() {
responseObserver.onCompleted();
}
};
}
@Override
public void subscription(SubscriptionReq request, StreamObserver<SubscriptionsResp> responseObserver) {