未验证 提交 e360ca36 编写于 作者: P pg.yang 提交者: GitHub

Remove handler scan in otel receiver, manual initialization instead (#10303)

上级 f7162d58
......@@ -82,6 +82,7 @@
* Add Python Websocket module component ID(7018).
* [Optional] Optimize single trace query performance by customizing routing in ElasticSearch. SkyWalking trace segments and Zipkin spans are using trace ID for routing. This is OFF by default, controlled by `storage/elasticsearch/enableCustomRouting`.
* Enhance OAP HTTP server to support HTTPS
* Remove handler scan in otel receiver, manual initialization instead
#### UI
......
......@@ -18,43 +18,10 @@
package org.apache.skywalking.oap.server.receiver.otel;
import com.google.common.collect.ImmutableSet;
import com.google.common.reflect.ClassPath;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
public interface Handler {
static List<Handler> all() throws HandlerInitializationException {
ClassPath classpath;
try {
classpath = ClassPath.from(Handler.class.getClassLoader());
} catch (IOException e) {
throw new HandlerInitializationException("failed to load handler classes", e);
}
ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive(Handler.class.getPackage().getName());
List<Handler> result = new ArrayList<>();
for (ClassPath.ClassInfo each : classes) {
Class<?> c = each.load();
if (Arrays.stream(c.getInterfaces()).anyMatch(interfaceClass -> interfaceClass.isAssignableFrom(Handler.class))) {
try {
result.add((Handler) c.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new HandlerInitializationException("failed to get instances of handler classed", e);
}
}
}
return result;
}
String type();
void active(OtelMetricReceiverConfig config,
MeterSystem meterSystem,
GRPCHandlerRegister grpcHandlerRegister) throws ModuleStartException;
void active() throws ModuleStartException;
}
......@@ -18,20 +18,21 @@
package org.apache.skywalking.oap.server.receiver.otel;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.receiver.otel.oc.OCMetricHandler;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricHandler;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import static java.util.stream.Collectors.toList;
public class OtelMetricReceiverProvider extends ModuleProvider {
public static final String NAME = "default";
private List<Handler> handlers;
private OtelMetricReceiverConfig config;
@Override
......@@ -45,10 +46,10 @@ public class OtelMetricReceiverProvider extends ModuleProvider {
}
@Override
public ConfigCreator newConfigCreator() {
public ConfigCreator<OtelMetricReceiverConfig> newConfigCreator() {
return new ConfigCreator<OtelMetricReceiverConfig>() {
@Override
public Class type() {
public Class<OtelMetricReceiverConfig> type() {
return OtelMetricReceiverConfig.class;
}
......@@ -61,23 +62,24 @@ public class OtelMetricReceiverProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
final List<String> enabledHandlers = config.getEnabledHandlers();
List<Handler> handlers = new ArrayList<>();
final OpenTelemetryMetricHandler openTelemetryMetricHandler = new OpenTelemetryMetricHandler(
getManager(), config);
if (enabledHandlers.contains(openTelemetryMetricHandler.type())) {
handlers.add(openTelemetryMetricHandler);
}
final OCMetricHandler ocMetricHandler = new OCMetricHandler(getManager(), config);
if (enabledHandlers.contains(ocMetricHandler.type())) {
handlers.add(ocMetricHandler);
}
this.handlers = handlers;
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
if (config.getEnabledHandlers().isEmpty()) {
return;
}
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
final MeterSystem meterSystem = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class);
final List<Handler> handlers =
Handler.all().stream()
.filter(h -> config.getEnabledHandlers().contains(h.type()))
.collect(toList());
for (Handler h : handlers) {
h.active(config, meterSystem, grpcHandlerRegister);
h.active();
}
}
......
......@@ -33,33 +33,41 @@ import io.opencensus.proto.metrics.v1.SummaryValue;
import io.opencensus.proto.resource.v1.Resource;
import io.vavr.Function1;
import io.vavr.Tuple;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
import org.apache.skywalking.oap.server.receiver.otel.Handler;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import static java.util.stream.Collectors.toList;
@RequiredArgsConstructor
public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase implements Handler {
private static final String HOST_NAME_LABEL = "node_identifier_host_name";
private List<PrometheusMetricConverter> converters;
private final ModuleManager manager;
private final OtelMetricReceiverConfig config;
@Override public StreamObserver<ExportMetricsServiceRequest> export(
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
return new StreamObserver<ExportMetricsServiceRequest>() {
......@@ -175,10 +183,7 @@ public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase i
}
@Override
public void active(
OtelMetricReceiverConfig config,
MeterSystem meterSystem,
GRPCHandlerRegister grpcHandlerRegister)
public void active()
throws ModuleStartException {
final List<String> enabledRules =
Splitter.on(",")
......@@ -193,6 +198,10 @@ public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase i
if (rules.isEmpty()) {
return;
}
GRPCHandlerRegister grpcHandlerRegister = manager.find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
this.converters = rules.stream().map(r -> new PrometheusMetricConverter(r, meterSystem))
.collect(toList());
grpcHandlerRegister.addHandler(this);
......
......@@ -18,22 +18,32 @@
package org.apache.skywalking.oap.server.receiver.otel.otlp;
import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint.ValueAtQuantile;
import io.vavr.Function1;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
......@@ -42,23 +52,22 @@ import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
import org.apache.skywalking.oap.server.receiver.otel.Handler;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint.ValueAtQuantile;
import io.vavr.Function1;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
@Slf4j
@RequiredArgsConstructor
public class OpenTelemetryMetricHandler
extends MetricsServiceGrpc.MetricsServiceImplBase
implements Handler {
private final ModuleManager manager;
private final OtelMetricReceiverConfig config;
private static final Map<String, String> LABEL_MAPPINGS =
ImmutableMap
.<String, String>builder()
......@@ -75,14 +84,11 @@ public class OpenTelemetryMetricHandler
}
@Override
public void active(
final OtelMetricReceiverConfig config,
final MeterSystem service,
final GRPCHandlerRegister grpcHandlerRegister) throws ModuleStartException {
public void active() throws ModuleStartException {
final List<String> enabledRules =
Splitter.on(",")
.omitEmptyStrings()
.splitToList(config.getEnabledOtelRules());
.omitEmptyStrings()
.splitToList(config.getEnabledOtelRules());
final List<Rule> rules;
try {
rules = Rules.loadRules("otel-rules", enabledRules);
......@@ -94,9 +100,14 @@ public class OpenTelemetryMetricHandler
return;
}
GRPCHandlerRegister grpcHandlerRegister = manager.find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
converters = rules
.stream()
.map(r -> new PrometheusMetricConverter(r, service))
.map(r -> new PrometheusMetricConverter(r, meterSystem))
.collect(toList());
grpcHandlerRegister.addHandler(this);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册