未验证 提交 d9617cbe 编写于 作者: A Ax1an 提交者: GitHub

Add telemetry data about metrics in, metrics scraping and trace in metrics to...

Add telemetry data about metrics in, metrics scraping and trace in metrics to zipkin receiver. (#6516)
上级 ad94caff
......@@ -45,6 +45,7 @@ Release Notes.
* Save Envoy http access logs when error occurs.
* Fix wrong `service_instance_sla` setting in the `topology-instance.yml`.
* Fix wrong metrics name setting in the `self-observability.yml`.
* Add telemetry data about metrics in, metrics scraping and trace in metrics to zipkin receiver.
#### UI
* Update selector scroller to show in all pages.
......
......@@ -36,6 +36,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@Slf4j
public class KafkaFetcherProvider extends ModuleProvider {
......@@ -89,6 +90,7 @@ public class KafkaFetcherProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
return new String[] {
TelemetryModule.NAME,
AnalyzerModule.NAME,
LogAnalyzerModule.NAME,
CoreModule.NAME
......
......@@ -27,6 +27,11 @@ import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* A handler deserializes the message of meter system data and pushes it to downstream.
......@@ -35,19 +40,38 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class MeterServiceHandler implements KafkaHandler {
private KafkaFetcherConfig config;
private IMeterProcessService processService;
private final HistogramMetrics histogram;
private final CounterMetrics errorCounter;
public MeterServiceHandler(ModuleManager manager, KafkaFetcherConfig config) {
this.config = config;
this.processService = manager.find(AnalyzerModule.NAME).provider().getService(IMeterProcessService.class);
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"meter_in_latency", "The process latency of meter",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka-fetcher")
);
errorCounter = metricsCreator.createCounter("meter_analysis_error_count", "The error number of meter analysis",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka-fetcher")
);
}
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
try {
MeterDataCollection meterDataCollection = MeterDataCollection.parseFrom(record.value().get());
MeterProcessor processor = processService.createProcessor();
meterDataCollection.getMeterDataList().forEach(meterData -> processor.read(meterData));
meterDataCollection.getMeterDataList().forEach(meterData -> {
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
processor.read(meterData);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
}
});
processor.process();
} catch (Exception e) {
......
......@@ -52,6 +52,11 @@ import org.apache.skywalking.oap.server.library.util.prometheus.Parser;
import org.apache.skywalking.oap.server.library.util.prometheus.Parsers;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricFamily;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
@Slf4j
public class PrometheusFetcherProvider extends ModuleProvider {
......@@ -62,6 +67,10 @@ public class PrometheusFetcherProvider extends ModuleProvider {
private ScheduledExecutorService ses;
private HistogramMetrics histogram;
private CounterMetrics errorCounter;
public PrometheusFetcherProvider() {
config = new PrometheusFetcherConfig();
}
......@@ -89,6 +98,16 @@ public class PrometheusFetcherProvider extends ModuleProvider {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
MetricsCreator metricsCreator = getManager().find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"metrics_fetcher_latency", "The process latency of metrics scraping",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
errorCounter = metricsCreator.createCounter("metrics_fetcher_error_count", "The error number of metrics scraping",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
}
@Override
......@@ -103,44 +122,49 @@ public class PrometheusFetcherProvider extends ModuleProvider {
private final PrometheusMetricConverter converter = new PrometheusMetricConverter(r, service);
@Override public void run() {
if (Objects.isNull(r.getStaticConfig())) {
return;
}
StaticConfig sc = r.getStaticConfig();
long now = System.currentTimeMillis();
converter.toMeter(sc.getTargets().stream()
.map(CheckedFunction1.liftTry(target -> {
URI url = new URI(target.getUrl());
URI targetURL = url.resolve(r.getMetricsPath());
String content = HttpClient.builder().url(targetURL.toString()).caFilePath(target.getSslCaFilePath()).build().request();
List<Metric> result = new ArrayList<>();
try (InputStream targetStream = new ByteArrayInputStream(content.getBytes(Charsets.UTF_8))) {
Parser p = Parsers.text(targetStream);
MetricFamily mf;
while ((mf = p.parse(now)) != null) {
mf.getMetrics().forEach(metric -> {
if (Objects.isNull(sc.getLabels())) {
return;
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
if (Objects.isNull(r.getStaticConfig())) {
return;
}
StaticConfig sc = r.getStaticConfig();
long now = System.currentTimeMillis();
converter.toMeter(sc.getTargets().stream()
.map(CheckedFunction1.liftTry(target -> {
URI url = new URI(target.getUrl());
URI targetURL = url.resolve(r.getMetricsPath());
String content = HttpClient.builder().url(targetURL.toString()).caFilePath(target.getSslCaFilePath()).build().request();
List<Metric> result = new ArrayList<>();
try (InputStream targetStream = new ByteArrayInputStream(content.getBytes(Charsets.UTF_8))) {
Parser p = Parsers.text(targetStream);
MetricFamily mf;
while ((mf = p.parse(now)) != null) {
mf.getMetrics().forEach(metric -> {
if (Objects.isNull(sc.getLabels())) {
return;
}
Map<String, String> extraLabels = Maps.newHashMap(sc.getLabels());
extraLabels.put("instance", target.getUrl());
extraLabels.forEach((key, value) -> {
if (metric.getLabels().containsKey(key)) {
metric.getLabels().put("exported_" + key, metric.getLabels().get(key));
}
metric.getLabels().put(key, value);
});
});
result.addAll(mf.getMetrics());
}
Map<String, String> extraLabels = Maps.newHashMap(sc.getLabels());
extraLabels.put("instance", target.getUrl());
extraLabels.forEach((key, value) -> {
if (metric.getLabels().containsKey(key)) {
metric.getLabels().put("exported_" + key, metric.getLabels().get(key));
}
metric.getLabels().put(key, value);
});
});
result.addAll(mf.getMetrics());
}
}
if (log.isDebugEnabled()) {
log.debug("Fetch metrics from prometheus: {}", result);
}
return result;
}))
.flatMap(tryIt -> MetricConvert.log(tryIt, "Load metric"))
.flatMap(Collection::stream));
}
if (log.isDebugEnabled()) {
log.debug("Fetch metrics from prometheus: {}", result);
}
return result;
}))
.flatMap(tryIt -> MetricConvert.log(tryIt, "Load metric"))
.flatMap(Collection::stream));
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
}
}
}, 0L, Duration.parse(r.getFetcherInterval()).getSeconds(), TimeUnit.SECONDS);
});
......@@ -148,6 +172,9 @@ public class PrometheusFetcherProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
return new String[] {
TelemetryModule.NAME,
CoreModule.NAME
};
}
}
......@@ -30,6 +30,7 @@ import org.apache.skywalking.oap.server.receiver.meter.module.MeterReceiverModul
import org.apache.skywalking.oap.server.receiver.meter.provider.handler.MeterServiceHandler;
import org.apache.skywalking.oap.server.receiver.meter.provider.handler.MeterServiceHandlerCompat;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
public class MeterReceiverProvider extends ModuleProvider {
......@@ -62,7 +63,7 @@ public class MeterReceiverProvider extends ModuleProvider {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
MeterServiceHandler meterServiceHandlerCompat = new MeterServiceHandler(processService);
MeterServiceHandler meterServiceHandlerCompat = new MeterServiceHandler(getManager(), processService);
grpcHandlerRegister.addHandler(meterServiceHandlerCompat);
grpcHandlerRegister.addHandler(new MeterServiceHandlerCompat(meterServiceHandlerCompat));
}
......@@ -74,6 +75,7 @@ public class MeterReceiverProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
return new String[] {
TelemetryModule.NAME,
CoreModule.NAME,
AnalyzerModule.NAME,
SharingServerModule.NAME
......
......@@ -25,7 +25,13 @@ import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* Meter protocol receiver, collect and process the meters.
......@@ -34,9 +40,22 @@ import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
public class MeterServiceHandler extends MeterReportServiceGrpc.MeterReportServiceImplBase implements GRPCHandler {
private final IMeterProcessService processService;
private final HistogramMetrics histogram;
private final CounterMetrics errorCounter;
public MeterServiceHandler(IMeterProcessService processService) {
public MeterServiceHandler(ModuleManager manager, IMeterProcessService processService) {
this.processService = processService;
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"meter_in_latency", "The process latency of meter",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("grpc")
);
errorCounter = metricsCreator.createCounter("meter_analysis_error_count", "The error number of meter analysis",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("grpc")
);
}
@Override
......@@ -45,7 +64,12 @@ public class MeterServiceHandler extends MeterReportServiceGrpc.MeterReportServi
return new StreamObserver<MeterData>() {
@Override
public void onNext(MeterData meterData) {
processor.read(meterData);
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
processor.read(meterData);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
}
}
@Override
......
......@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServerConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
public class ZipkinReceiverProvider extends ModuleProvider {
public static final String NAME = "default";
......@@ -91,6 +92,9 @@ public class ZipkinReceiverProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
return new String[] {
TelemetryModule.NAME,
CoreModule.NAME
};
}
}
......@@ -27,6 +27,11 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.codec.SpanBytesDecoder;
@Slf4j
......@@ -34,11 +39,23 @@ public class SpanV1JettyHandler extends JettyHandler {
private final ZipkinReceiverConfig config;
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final HistogramMetrics histogram;
private final CounterMetrics errorCounter;
public SpanV1JettyHandler(ZipkinReceiverConfig config, ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class);
this.config = config;
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"trace_in_latency", "The process latency of trace data",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-v1")
);
errorCounter = metricsCreator.createCounter("trace_analysis_error_count", "The error number of trace analysis",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-v1")
);
}
@Override
......@@ -51,7 +68,7 @@ public class SpanV1JettyHandler extends JettyHandler {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
try {
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
String type = request.getHeader("Content-Type");
int encode = type != null && type.contains("/x-thrift") ? SpanEncode.THRIFT : SpanEncode.JSON_V1;
......@@ -64,7 +81,7 @@ public class SpanV1JettyHandler extends JettyHandler {
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
errorCounter.inc();
log.error(e.getMessage(), e);
}
}
......
......@@ -27,6 +27,11 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.codec.SpanBytesDecoder;
@Slf4j
......@@ -35,11 +40,23 @@ public class SpanV2JettyHandler extends JettyHandler {
private final ZipkinReceiverConfig config;
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final HistogramMetrics histogram;
private final CounterMetrics errorCounter;
public SpanV2JettyHandler(ZipkinReceiverConfig config, ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class);
this.config = config;
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"trace_in_latency", "The process latency of trace data",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-v2")
);
errorCounter = metricsCreator.createCounter("trace_analysis_error_count", "The error number of trace analysis",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-v2")
);
}
@Override
......@@ -52,7 +69,7 @@ public class SpanV2JettyHandler extends JettyHandler {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
try {
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
String type = request.getHeader("Content-Type");
int encode = type != null && type.contains("/x-protobuf") ? SpanEncode.PROTO3 : SpanEncode.JSON_V2;
......@@ -65,7 +82,7 @@ public class SpanV2JettyHandler extends JettyHandler {
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
errorCounter.inc();
log.error(e.getMessage(), e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册