From 601b472ef6ebe5c0d026a5afcc5e6856683c0328 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Sat, 30 Mar 2019 10:36:36 -0700 Subject: [PATCH] Support Backend acts as pure Zipkin collector (#2424) * Codebase for zipkin span persistence. * Fix missing fields in storage. * Miss the latency field. * Finish some tests. * Fix wrong latency. * Finish doc and reset application.yml * Make the description more clear. --- docs/en/setup/backend/backend-receivers.md | 28 ++- docs/en/setup/backend/backend-storage.md | 20 ++ .../server/core/query/TraceQueryService.java | 20 +- .../server/core/query/entity/KeyValue.java | 8 + .../core/source/DefaultScopeDefine.java | 1 + .../core/storage/query/ITraceQueryDAO.java | 8 + .../zipkin-receiver-plugin/pom.xml | 5 + .../receiver/zipkin/CoreRegisterLinker.java | 11 +- .../receiver/zipkin/ZipkinReceiverConfig.java | 47 +--- .../zipkin/ZipkinReceiverProvider.java | 25 +- .../Receiver2AnalysisBridge.java | 8 +- .../analysis/ZipkinSkyWalkingTransfer.java | 43 ++++ .../ZipkinTraceOSInfoBuilder.java | 2 +- .../{ => analysis}/cache/CacheFactory.java | 4 +- .../{ => analysis}/cache/ISpanCache.java | 2 +- .../cache/caffeine/CaffeineSpanCache.java | 8 +- .../{ => analysis}/data/SkyWalkingTrace.java | 2 +- .../{ => analysis}/data/ZipkinTrace.java | 2 +- .../transform/SegmentBuilder.java | 7 +- .../transform/SegmentListener.java | 4 +- .../transform/Zipkin2SkyWalkingTransfer.java | 6 +- .../receiver/zipkin/handler/SpanEncode.java | 45 ++++ .../zipkin/handler/SpanProcessor.java | 46 ++-- .../zipkin/handler/SpanV1JettyHandler.java | 29 ++- .../zipkin/handler/SpanV2JettyHandler.java | 25 +- .../receiver/zipkin/trace/SpanForward.java | 109 +++++++++ .../SpringSleuthSegmentBuilderTest.java | 4 +- oap-server/server-starter/pom.xml | 5 + oap-server/server-storage-plugin/pom.xml | 1 + .../StorageModuleElasticsearchConfig.java | 4 +- .../StorageModuleElasticsearchProvider.java | 4 +- .../elasticsearch/query/TraceQueryEsDAO.java | 4 + .../plugin/jdbc/h2/dao/H2TraceQueryDAO.java | 4 + .../storage-zipkin-plugin/pom.xml | 43 ++++ .../storage/plugin/zipkin/ZipkinSpan.java | 52 ++++ .../plugin/zipkin/ZipkinSpanRecord.java | 112 +++++++++ .../zipkin/ZipkinSpanRecordDispatcher.java | 48 ++++ ...kinStorageModuleElasticsearchProvider.java | 57 +++++ .../elasticsearch/ZipkinTraceQueryEsDAO.java | 222 ++++++++++++++++++ ...g.oap.server.library.module.ModuleProvider | 19 ++ 40 files changed, 968 insertions(+), 126 deletions(-) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/Receiver2AnalysisBridge.java (82%) create mode 100644 oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/ZipkinTraceOSInfoBuilder.java (94%) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/cache/CacheFactory.java (89%) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/cache/ISpanCache.java (92%) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/cache/caffeine/CaffeineSpanCache.java (91%) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/data/SkyWalkingTrace.java (96%) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/data/ZipkinTrace.java (95%) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/transform/SegmentBuilder.java (98%) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/transform/SegmentListener.java (84%) rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/transform/Zipkin2SkyWalkingTransfer.java (86%) create mode 100644 oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java create mode 100644 oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java rename oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/{ => analysis}/transform/SpringSleuthSegmentBuilderTest.java (98%) create mode 100644 oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml create mode 100644 oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java create mode 100644 oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java create mode 100644 oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java create mode 100644 oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java create mode 100644 oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java create mode 100644 oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider diff --git a/docs/en/setup/backend/backend-receivers.md b/docs/en/setup/backend/backend-receivers.md index 237240c963..a4302a99e1 100644 --- a/docs/en/setup/backend/backend-receivers.md +++ b/docs/en/setup/backend/backend-receivers.md @@ -11,8 +11,7 @@ We have following receivers, and `default` implementors are provided in our Apac 1. **receiver-jvm**. gRPC services accept JVM metric data. 1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services. 1. **envoy-metric**. Envoy `metrics_service` supported by this receiver. OAL script support all GAUGE type metrics. -1. **receiver_zipkin**. HTTP service accepts Span in Zipkin v1 and v2 formats. Notice, this receiver only -works as expected in backend single node mode. Cluster mode is not supported. Welcome anyone to improve this. +1. **receiver_zipkin**. See [details](#zipkin-receiver). The sample settings of these receivers should be already in default `application.yml`, and also list here ```yaml @@ -59,4 +58,27 @@ receiver-sharing-server: ``` Notice, if you add these settings, make sure they are not as same as core module, -because gRPC/HTTP servers of core are still used for UI and OAP internal communications. \ No newline at end of file +because gRPC/HTTP servers of core are still used for UI and OAP internal communications. + +## Zipkin receiver +Zipkin receiver could work in two different mode. +1. Tracing mode(default). Tracing mode is that, skywalking OAP acts like zipkin collector, +fully supports Zipkin v1/v2 formats through HTTP service, +also provide persistence and query in skywalking UI. +But it wouldn't analysis metric from them. In most case, I suggest you could use this feature, when metrics come from service mesh. +Notice, in this mode, Zipkin receiver requires `zipkin-elasticsearch` storage implementation active. +Read [this](backend-storage.md#elasticsearch-6-with-zipkin-trace-extension) to know +how to active. +1. Analysis mode(Not production ready), receive Zipkin v1/v2 formats through HTTP service. Transform the trace to skywalking +native format, and analysis like skywalking trace. This feature can't work in production env right now, +because of Zipkin tag/endpoint value unpredictable, we can't make sure it fits production env requirements. + +Active `analysis mode`, you should set `needAnalysis` config. +```yaml +receiver_zipkin: + default: + host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0} + port: ${SW_RECEIVER_ZIPKIN_PORT:9411} + contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/} + needAnalysis: true +``` \ No newline at end of file diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md index b9baa3e7a2..f769690fb9 100644 --- a/docs/en/setup/backend/backend-storage.md +++ b/docs/en/setup/backend/backend-storage.md @@ -49,6 +49,26 @@ storage: concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests ``` +### ElasticSearch 6 with Zipkin trace extension +This implementation shares most of `elasticsearch`, just extend to support zipkin span storage. +It has all same configs. +```yaml +storage: + zipkin-elasticsearch: + nameSpace: ${SW_NAMESPACE:""} + clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200} + user: ${SW_ES_USER:""} + password: ${SW_ES_PASSWORD:""} + indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2} + indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0} + # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html + bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests + bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb + flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests + concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests +``` + + ### About Namespace When namespace is set, names of all indexes in ElasticSearch will use it as prefix. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java index b735525013..d0a9df7bae 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java @@ -103,14 +103,18 @@ public class TraceQueryService implements Service { Trace trace = new Trace(); List segmentRecords = getTraceQueryDAO().queryByTraceId(traceId); - for (SegmentRecord segment : segmentRecords) { - if (nonNull(segment)) { - if (segment.getVersion() == 2) { - SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary()); - trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList())); - } else { - TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary()); - trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList())); + if (segmentRecords.isEmpty()) { + trace.getSpans().addAll(getTraceQueryDAO().doFlexibleTraceQuery(traceId)); + } else { + for (SegmentRecord segment : segmentRecords) { + if (nonNull(segment)) { + if (segment.getVersion() == 2) { + SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary()); + trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList())); + } else { + TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary()); + trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList())); + } } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java index 46e355ef66..b6cbd953a4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java @@ -28,4 +28,12 @@ import lombok.*; public class KeyValue { private String key; private String value; + + public KeyValue(String key, String value) { + this.key = key; + this.value = value; + } + + public KeyValue() { + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java index 78b3565d9f..aec1665f0d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java @@ -59,6 +59,7 @@ public class DefaultScopeDefine { public static final int SERVICE_INSTANCE_CLR_GC = 20; public static final int SERVICE_INSTANCE_CLR_THREAD = 21; public static final int ENVOY_INSTANCE_METRIC = 22; + public static final int ZIPKIN_SPAN = 23; /** * Catalog of scope, the indicator processor could use this to group all generated indicators by oal tool. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java index fc98d03ba4..6517d2f8bc 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java @@ -34,4 +34,12 @@ public interface ITraceQueryDAO extends Service { int limit, int from, TraceState traceState, QueryOrder queryOrder) throws IOException; List queryByTraceId(String traceId) throws IOException; + + /** + * This method gives more flexible for unnative + * @param traceId + * @return + * @throws IOException + */ + List doFlexibleTraceQuery(String traceId) throws IOException; } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml index d2adbc1fa6..a332f7378e 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml @@ -35,6 +35,11 @@ skywalking-trace-receiver-plugin ${project.version} + + org.apache.skywalking + storage-zipkin-plugin + ${project.version} + org.apache.skywalking skywalking-register-receiver-plugin diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java index 24591999dc..57e4bc706d 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java @@ -19,14 +19,14 @@ package org.apache.skywalking.oap.server.receiver.zipkin; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; public class CoreRegisterLinker { private static volatile ModuleManager MODULE_MANAGER; private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER; private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER; + private static volatile IEndpointInventoryRegister ENDPOINT_INVENTORY_REGISTER; public static void setModuleManager(ModuleManager moduleManager) { CoreRegisterLinker.MODULE_MANAGER = moduleManager; @@ -45,4 +45,11 @@ public class CoreRegisterLinker { } return SERVICE_INSTANCE_INVENTORY_REGISTER; } + + public static IEndpointInventoryRegister getEndpointInventoryRegister() { + if (ENDPOINT_INVENTORY_REGISTER == null) { + ENDPOINT_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IEndpointInventoryRegister.class); + } + return ENDPOINT_INVENTORY_REGISTER; + } } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java index a5e14af22b..cf9ff22ff5 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java @@ -18,57 +18,20 @@ package org.apache.skywalking.oap.server.receiver.zipkin; +import lombok.*; import org.apache.skywalking.oap.server.library.module.ModuleConfig; /** * @author wusheng */ +@Setter +@Getter public class ZipkinReceiverConfig extends ModuleConfig { private String host; private int port; private String contextPath; - private int expireTime = 20; - private int maxCacheSize = 1_000_000; - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getContextPath() { - return contextPath; - } - - public void setContextPath(String contextPath) { - this.contextPath = contextPath; - } - - public int getExpireTime() { - return expireTime; - } - - public void setExpireTime(int expireTime) { - this.expireTime = expireTime; - } - - public int getMaxCacheSize() { - return maxCacheSize; - } - - public void setMaxCacheSize(int maxCacheSize) { - this.maxCacheSize = maxCacheSize; - } + private boolean needAnalysis = false; + private boolean registerZipkinEndpoint = true; } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java index 2eb20791b6..50e1e63bf6 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.receiver.zipkin; +import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; @@ -27,9 +28,10 @@ import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.jetty.JettyServer; import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*; import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler; import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler; -import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer; /** * @author wusheng @@ -65,12 +67,14 @@ public class ZipkinReceiverProvider extends ModuleProvider { jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath()); jettyServer.initialize(); - jettyServer.addHandler(new SpanV1JettyHandler(config)); - jettyServer.addHandler(new SpanV2JettyHandler(config)); + jettyServer.addHandler(new SpanV1JettyHandler(config, getManager())); + jettyServer.addHandler(new SpanV2JettyHandler(config, getManager())); - ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class); - Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService); - Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge); + if (config.isNeedAnalysis()) { + ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class); + Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService); + Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge); + } } @Override public void notifyAfterCompleted() throws ModuleStartException { @@ -82,6 +86,13 @@ public class ZipkinReceiverProvider extends ModuleProvider { } @Override public String[] requiredModules() { - return new String[] {TraceModule.NAME}; + if (config.isNeedAnalysis()) { + return new String[] {TraceModule.NAME}; + } else { + /** + * In pure trace status, we don't need the trace receiver. + */ + return new String[] {CoreModule.NAME}; + } } } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java similarity index 82% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java index 1381da3ed4..53051e57a4 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java @@ -16,12 +16,12 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; -import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; -import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener; -import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.SegmentListener; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer; /** * Send the segments to Analysis module, like receiving segments from native SkyWalking agents. diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java new file mode 100644 index 0000000000..c4ec894e81 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.analysis; + +import java.util.List; +import org.apache.skywalking.oap.server.receiver.zipkin.*; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.CacheFactory; +import zipkin2.Span; + +public class ZipkinSkyWalkingTransfer { + public void doTransfer(ZipkinReceiverConfig config, List spanList) { + spanList.forEach(span -> { + // In Zipkin, the local service name represents the application owner. + String applicationCode = span.localServiceName(); + if (applicationCode != null) { + int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode, null); + if (applicationId != 0) { + CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode, + span.timestampAsLong(), + ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode)); + } + } + + CacheFactory.INSTANCE.get(config).addSpan(span); + }); + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java similarity index 94% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java index 451ba53fca..7d286f84bb 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis; import com.google.gson.JsonObject; import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java similarity index 89% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java index c4d5b06140..8893001242 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java @@ -16,10 +16,10 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.cache; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine.CaffeineSpanCache; /** * @author wusheng diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java similarity index 92% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java index b122bcfefb..0f9f3e4337 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.cache; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache; import zipkin2.Span; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java similarity index 91% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java index c42e710515..d8dc260263 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -28,9 +28,9 @@ import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.apache.skywalking.oap.server.receiver.zipkin.cache.ISpanCache; -import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace; -import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.ISpanCache; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import zipkin2.Span; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java similarity index 96% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java index 38c59ec6b3..ab8712c9f5 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.data; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data; import java.util.LinkedList; import java.util.List; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java similarity index 95% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java index d12beb8b60..e54a6139cf 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.data; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data; import java.util.LinkedList; import java.util.List; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java similarity index 98% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java index 3f687a6cc7..e5af454c1e 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.transform; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform; import com.google.common.base.Strings; import java.util.*; @@ -25,8 +25,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.skywalking.apm.network.common.KeyStringValuePair; import org.apache.skywalking.apm.network.language.agent.*; import org.apache.skywalking.apm.network.language.agent.v2.*; -import org.apache.skywalking.oap.server.receiver.zipkin.*; -import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace; import org.eclipse.jetty.util.StringUtil; import zipkin2.*; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java similarity index 84% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java index 9a0b7c7289..5c37c9ab09 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java @@ -16,9 +16,9 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.transform; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform; -import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace; public interface SegmentListener { void notify(SkyWalkingTrace trace); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java similarity index 86% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java index b41e50ebfc..25a59ab33d 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java @@ -16,12 +16,12 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.transform; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform; import java.util.LinkedList; import java.util.List; -import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; -import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace; import zipkin2.Span; /** diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java new file mode 100644 index 0000000000..ee5a91cd4a --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanEncode.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.handler; + +/** + * @author wusheng + */ +public class SpanEncode { + public static final int PROTO3 = 1; + public static final int JSON_V2 = 2; + public static final int THRIFT = 3; + public static final int JSON_V1 = 4; + + public static boolean isProto3(int encode) { + return PROTO3 == encode; + } + + public static boolean isJsonV2(int encode) { + return JSON_V2 == encode; + } + + public static boolean isThrift(int encode) { + return THRIFT == encode; + } + + public static boolean isJsonV1(int encode) { + return JSON_V1 == encode; + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java index 48774f9237..30a713fa2f 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java @@ -18,20 +18,33 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.util.List; import java.util.zip.GZIPInputStream; import javax.servlet.http.HttpServletRequest; -import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder; -import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.ZipkinSkyWalkingTransfer; +import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; public class SpanProcessor { + private SourceReceiver receiver; + private ServiceInventoryCache serviceInventoryCache; + private EndpointInventoryCache endpointInventoryCache; + private int encode; + + public SpanProcessor(SourceReceiver receiver, + ServiceInventoryCache serviceInventoryCache, + EndpointInventoryCache endpointInventoryCache, int encode) { + this.receiver = receiver; + this.serviceInventoryCache = serviceInventoryCache; + this.endpointInventoryCache = endpointInventoryCache; + this.encode = encode; + } + void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException { InputStream inputStream = getInputStream(request); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -44,20 +57,13 @@ public class SpanProcessor { List spanList = decoder.decodeList(out.toByteArray()); - spanList.forEach(span -> { - // In Zipkin, the local service name represents the application owner. - String applicationCode = span.localServiceName(); - if (applicationCode != null) { - int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode, null); - if (applicationId != 0) { - CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode, - span.timestampAsLong(), - ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode)); - } - } - - CacheFactory.INSTANCE.get(config).addSpan(span); - }); + if (config.isNeedAnalysis()) { + ZipkinSkyWalkingTransfer transfer = new ZipkinSkyWalkingTransfer(); + transfer.doTransfer(config, spanList); + } else { + SpanForward forward = new SpanForward(config, receiver, serviceInventoryCache, endpointInventoryCache, encode); + forward.send(spanList); + } } private InputStream getInputStream(HttpServletRequest request) throws IOException { diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java index 77f38e48c2..4ab8b8ddcd 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java @@ -18,20 +18,29 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; import zipkin2.codec.SpanBytesDecoder; public class SpanV1JettyHandler extends JettyHandler { private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class); private ZipkinReceiverConfig config; + private SourceReceiver sourceReceiver; + private ServiceInventoryCache serviceInventoryCache; + private EndpointInventoryCache endpointInventoryCache; - public SpanV1JettyHandler(ZipkinReceiverConfig config) { + public SpanV1JettyHandler(ZipkinReceiverConfig config, + ModuleManager manager) { + sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); + serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class); + endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class); this.config = config; } @@ -48,11 +57,13 @@ public class SpanV1JettyHandler extends JettyHandler { try { String type = request.getHeader("Content-Type"); - SpanBytesDecoder decoder = type != null && type.contains("/x-thrift") - ? SpanBytesDecoder.THRIFT - : SpanBytesDecoder.JSON_V1; + int encode = type != null && type.contains("/x-thrift") ? SpanEncode.THRIFT : SpanEncode.JSON_V1; - SpanProcessor processor = new SpanProcessor(); + SpanBytesDecoder decoder = SpanEncode.isThrift(encode) + ? SpanBytesDecoder.THRIFT + : SpanBytesDecoder.JSON_V1; + + SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache, encode); processor.convert(config, decoder, request); response.setStatus(202); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java index 7c8705a284..a904c62987 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java @@ -18,12 +18,14 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; import zipkin2.codec.SpanBytesDecoder; /** @@ -33,8 +35,15 @@ public class SpanV2JettyHandler extends JettyHandler { private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class); private ZipkinReceiverConfig config; + private SourceReceiver sourceReceiver; + private ServiceInventoryCache serviceInventoryCache; + private EndpointInventoryCache endpointInventoryCache; - public SpanV2JettyHandler(ZipkinReceiverConfig config) { + public SpanV2JettyHandler(ZipkinReceiverConfig config, + ModuleManager manager) { + sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); + serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class); + endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class); this.config = config; } @@ -51,11 +60,13 @@ public class SpanV2JettyHandler extends JettyHandler { try { String type = request.getHeader("Content-Type"); - SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf") + int encode = type != null && type.contains("/x-protobuf") ? SpanEncode.PROTO3 : SpanEncode.JSON_V2; + + SpanBytesDecoder decoder = SpanEncode.isProto3(encode) ? SpanBytesDecoder.PROTO3 : SpanBytesDecoder.JSON_V2; - SpanProcessor processor = new SpanProcessor(); + SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache, encode); processor.convert(config, decoder, request); response.setStatus(202); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java new file mode 100644 index 0000000000..3d38868b66 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.trace; + +import java.util.List; +import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.library.util.*; +import org.apache.skywalking.oap.server.receiver.zipkin.*; +import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanEncode; +import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan; +import zipkin2.Span; +import zipkin2.codec.SpanBytesEncoder; + +/** + * @author wusheng + */ +public class SpanForward { + private ZipkinReceiverConfig config; + private SourceReceiver receiver; + private ServiceInventoryCache serviceInventoryCache; + private EndpointInventoryCache endpointInventoryCache; + private int encode; + + public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver, + ServiceInventoryCache serviceInventoryCache, + EndpointInventoryCache endpointInventoryCache, int encode) { + this.config = config; + this.receiver = receiver; + this.serviceInventoryCache = serviceInventoryCache; + this.endpointInventoryCache = endpointInventoryCache; + this.encode = encode; + } + + public void send(List spanList) { + spanList.forEach(span -> { + ZipkinSpan zipkinSpan = new ZipkinSpan(); + zipkinSpan.setTraceId(span.traceId()); + zipkinSpan.setSpanId(span.id()); + String serviceName = span.localServiceName(); + int serviceId = Const.NONE; + if (!StringUtil.isEmpty(serviceName)) { + serviceId = serviceInventoryCache.getServiceId(serviceName); + if (serviceId != Const.NONE) { + zipkinSpan.setServiceId(serviceId); + } else { + /** + * Only register, but don't wait. + * For this span, service id will be missed. + */ + CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, null); + } + } + + String spanName = span.name(); + Span.Kind kind = span.kind(); + switch (kind) { + case SERVER: + case CONSUMER: + if (!StringUtil.isEmpty(spanName) && serviceId != Const.NONE) { + int endpointId = endpointInventoryCache.getEndpointId(serviceId, spanName, + DetectPoint.SERVER.ordinal()); + if (endpointId != Const.NONE) { + zipkinSpan.setEndpointId(endpointId); + } else if (config.isRegisterZipkinEndpoint()) { + CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(serviceId, spanName, DetectPoint.SERVER); + } + } + } + if (!StringUtil.isEmpty(spanName)) { + zipkinSpan.setEndpointName(spanName); + } + long startTime = span.timestampAsLong() / 1000; + zipkinSpan.setStartTime(startTime); + if (startTime != 0) { + long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(zipkinSpan.getStartTime()); + zipkinSpan.setTimeBucket(timeBucket); + } + + long latency = span.durationAsLong() / 1000; + + zipkinSpan.setEndTime(startTime + latency); + zipkinSpan.setIsError(BooleanUtils.booleanToValue(false)); + zipkinSpan.setEncode(SpanEncode.PROTO3); + zipkinSpan.setLatency((int)latency); + zipkinSpan.setDataBinary(SpanBytesEncoder.PROTO3.encode(span)); + + receiver.receive(zipkinSpan); + }); + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java similarity index 98% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java index 9a538858d5..0c576c7dc7 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.transform; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform; import com.google.gson.JsonObject; import java.io.UnsupportedEncodingException; @@ -26,7 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.*; import org.apache.skywalking.oap.server.core.register.NodeType; import org.apache.skywalking.oap.server.core.register.service.*; import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; -import org.apache.skywalking.oap.server.receiver.zipkin.data.*; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.*; import org.junit.*; import org.powermock.reflect.Whitebox; import zipkin2.Span; diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml index 37c5330f67..cfd77d3716 100644 --- a/oap-server/server-starter/pom.xml +++ b/oap-server/server-starter/pom.xml @@ -126,6 +126,11 @@ storage-elasticsearch-plugin ${project.version} + + org.apache.skywalking + storage-zipkin-plugin + ${project.version} + diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml index 4cb4fcbec9..99291e12ec 100644 --- a/oap-server/server-storage-plugin/pom.xml +++ b/oap-server/server-storage-plugin/pom.xml @@ -30,6 +30,7 @@ storage-jdbc-hikaricp-plugin storage-elasticsearch-plugin + storage-zipkin-plugin \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index 7d461aa01d..6608a587de 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -59,7 +59,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { this.password = password; } - int getIndexShardsNumber() { + public int getIndexShardsNumber() { return indexShardsNumber; } @@ -67,7 +67,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { this.indexShardsNumber = indexShardsNumber; } - int getIndexReplicasNumber() { + public int getIndexReplicasNumber() { return indexReplicasNumber; } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 7d1a7135ca..ed3bc4bafb 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -70,8 +70,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class); - private final StorageModuleElasticsearchConfig config; - private ElasticSearchClient elasticSearchClient; + protected final StorageModuleElasticsearchConfig config; + protected ElasticSearchClient elasticSearchClient; public StorageModuleElasticsearchProvider() { super(); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java index ac5645a6b3..4528189f1c 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java @@ -147,4 +147,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO { } return segmentRecords; } + + @Override public List doFlexibleTraceQuery(String traceId) throws IOException { + return Collections.emptyList(); + } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java index ed8bf73416..ec231e5bca 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java @@ -166,6 +166,10 @@ public class H2TraceQueryDAO implements ITraceQueryDAO { return segmentRecords; } + @Override public List doFlexibleTraceQuery(String traceId) throws IOException { + return Collections.emptyList(); + } + protected JDBCHikariCPClient getClient() { return h2Client; } diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml new file mode 100644 index 0000000000..61d313b8a0 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml @@ -0,0 +1,43 @@ + + + + + + server-storage-plugin + org.apache.skywalking + 6.1.0-SNAPSHOT + + 4.0.0 + + storage-zipkin-plugin + + + + org.apache.skywalking + storage-elasticsearch-plugin + ${project.version} + + + io.zipkin.zipkin2 + zipkin + + + \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java new file mode 100644 index 0000000000..3cf99f623c --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin; + +import lombok.*; +import org.apache.skywalking.oap.server.core.source.*; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*; + +/** + * @author peng-yongsheng + */ +@ScopeDeclaration(id = ZIPKIN_SPAN, name = "ZipkinSpan") +public class ZipkinSpan extends Source { + + @Override public int scope() { + return DefaultScopeDefine.ZIPKIN_SPAN; + } + + @Override public String getEntityId() { + return traceId + spanId; + } + + @Setter @Getter private String traceId; + @Setter @Getter private String spanId; + @Setter @Getter private int serviceId; + @Setter @Getter private int serviceInstanceId; + @Setter @Getter private String endpointName; + @Setter @Getter private int endpointId; + @Setter @Getter private long startTime; + @Setter @Getter private long endTime; + @Setter @Getter private int latency; + @Setter @Getter private int isError; + @Setter @Getter private byte[] dataBinary; + @Setter @Getter private int encode; +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java new file mode 100644 index 0000000000..531443bdb6 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType; +import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; + +@RecordType +@StorageEntity(name = ZipkinSpanRecord.INDEX_NAME, builder = ZipkinSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ZIPKIN_SPAN) +public class ZipkinSpanRecord extends Record { + public static final String INDEX_NAME = "zipkin_span"; + public static final String TRACE_ID = "trace_id"; + public static final String SPAN_ID = "span_id"; + public static final String SERVICE_ID = "service_id"; + public static final String SERVICE_INSTANCE_ID = "service_instance_id"; + public static final String ENDPOINT_NAME = "endpoint_name"; + public static final String ENDPOINT_ID = "endpoint_id"; + public static final String START_TIME = "start_time"; + public static final String END_TIME = "end_time"; + public static final String LATENCY = "latency"; + public static final String IS_ERROR = "is_error"; + public static final String DATA_BINARY = "data_binary"; + public static final String ENCODE = "encode"; + + @Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId; + @Setter @Getter @Column(columnName = SPAN_ID) @IDColumn private String spanId; + @Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId; + @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) @IDColumn private int serviceInstanceId; + @Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) @IDColumn private String endpointName; + @Setter @Getter @Column(columnName = ENDPOINT_ID) @IDColumn private int endpointId; + @Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime; + @Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime; + @Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency; + @Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError; + @Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary; + @Setter @Getter @Column(columnName = ENCODE) @IDColumn private int encode; + + @Override public String id() { + return traceId + "-" + spanId; + } + + public static class Builder implements StorageBuilder { + + @Override public Map data2Map(ZipkinSpanRecord storageData) { + Map map = new HashMap<>(); + map.put(TRACE_ID, storageData.getTraceId()); + map.put(SPAN_ID, storageData.getSpanId()); + map.put(SERVICE_ID, storageData.getServiceId()); + map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId()); + map.put(ENDPOINT_NAME, storageData.getEndpointName()); + map.put(ENDPOINT_ID, storageData.getEndpointId()); + map.put(START_TIME, storageData.getStartTime()); + map.put(END_TIME, storageData.getEndTime()); + map.put(LATENCY, storageData.getLatency()); + map.put(IS_ERROR, storageData.getIsError()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + if (CollectionUtils.isEmpty(storageData.getDataBinary())) { + map.put(DATA_BINARY, Const.EMPTY_STRING); + } else { + map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary()))); + } + map.put(ENCODE, storageData.getEncode()); + return map; + } + + @Override public ZipkinSpanRecord map2Data(Map dbMap) { + ZipkinSpanRecord record = new ZipkinSpanRecord(); + record.setTraceId((String)dbMap.get(TRACE_ID)); + record.setSpanId((String)dbMap.get(SPAN_ID)); + record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue()); + record.setServiceInstanceId(((Number)dbMap.get(SERVICE_INSTANCE_ID)).intValue()); + record.setEndpointName((String)dbMap.get(ENDPOINT_NAME)); + record.setEndpointId(((Number)dbMap.get(ENDPOINT_ID)).intValue()); + record.setStartTime(((Number)dbMap.get(START_TIME)).longValue()); + record.setEndTime(((Number)dbMap.get(END_TIME)).longValue()); + record.setLatency(((Number)dbMap.get(LATENCY)).intValue()); + record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue()); + record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); + if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) { + record.setDataBinary(new byte[] {}); + } else { + record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY))); + } + record.setEncode(((Number)dbMap.get(ENCODE)).intValue()); + return record; + } + } +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java new file mode 100644 index 0000000000..eed4ca1aa3 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin; + +import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess; + +/** + * Dispatch for Zipkin native mode spans. + * + * @author wusheng + */ +public class ZipkinSpanRecordDispatcher implements SourceDispatcher { + @Override public void dispatch(ZipkinSpan source) { + ZipkinSpanRecord segment = new ZipkinSpanRecord(); + segment.setTraceId(source.getTraceId()); + segment.setSpanId(source.getSpanId()); + segment.setServiceId(source.getServiceId()); + segment.setServiceInstanceId(source.getServiceInstanceId()); + segment.setEndpointName(source.getEndpointName()); + segment.setEndpointId(source.getEndpointId()); + segment.setStartTime(source.getStartTime()); + segment.setEndTime(source.getEndTime()); + segment.setLatency(source.getLatency()); + segment.setIsError(source.getIsError()); + segment.setDataBinary(source.getDataBinary()); + segment.setTimeBucket(source.getTimeBucket()); + segment.setEncode(source.getEncode()); + + RecordProcess.INSTANCE.in(segment); + } +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java new file mode 100644 index 0000000000..e529fda25f --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch; + +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider { + + private static final Logger logger = LoggerFactory.getLogger(ZipkinStorageModuleElasticsearchProvider.class); + private ZipkinTraceQueryEsDAO traceQueryEsDAO; + + @Override + public String name() { + return "zipkin-elasticsearch"; + } + + @Override + public void prepare() throws ServiceNotProvidedException { + super.prepare(); + traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient); + this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO); + } + + @Override public void notifyAfterCompleted() { + super.notifyAfterCompleted(); + traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class)); + } + + @Override + public String[] requiredModules() { + return new String[] {CoreModule.NAME}; + } +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java new file mode 100644 index 0000000000..448e745732 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch; + +import com.google.common.base.Strings; +import java.io.IOException; +import java.util.*; +import lombok.Setter; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.query.entity.*; +import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.oap.server.library.util.BooleanUtils; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; +import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.*; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.*; +import org.elasticsearch.search.aggregations.bucket.terms.*; +import org.elasticsearch.search.aggregations.metrics.max.Max; +import org.elasticsearch.search.aggregations.metrics.min.Min; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; +import zipkin2.Span; +import zipkin2.codec.SpanBytesDecoder; + +import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.*; + +public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO { + @Setter + private ServiceInventoryCache serviceInventoryCache; + + public ZipkinTraceQueryEsDAO( + ElasticSearchClient client) { + super(client); + } + + @Override + public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, + String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from, + TraceState traceState, QueryOrder queryOrder) throws IOException { + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + sourceBuilder.query(boolQueryBuilder); + List mustQueryList = boolQueryBuilder.must(); + + if (startSecondTB != 0 && endSecondTB != 0) { + mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB)); + } + + if (minDuration != 0 || maxDuration != 0) { + RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY); + if (minDuration != 0) { + rangeQueryBuilder.gte(minDuration); + } + if (maxDuration != 0) { + rangeQueryBuilder.lte(maxDuration); + } + boolQueryBuilder.must().add(rangeQueryBuilder); + } + if (!Strings.isNullOrEmpty(endpointName)) { + mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName)); + } + if (serviceId != 0) { + boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId)); + } + if (serviceInstanceId != 0) { + boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId)); + } + if (endpointId != 0) { + boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId)); + } + if (!Strings.isNullOrEmpty(traceId)) { + boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId)); + } + switch (traceState) { + case ERROR: + mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE)); + break; + case SUCCESS: + mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE)); + break; + } + + TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID).field(TRACE_ID).size(limit) + .subAggregation( + AggregationBuilders.max(LATENCY).field(LATENCY) + ) + .subAggregation( + AggregationBuilders.min(START_TIME).field(START_TIME) + ); + switch (queryOrder) { + case BY_START_TIME: + builder.order(BucketOrder.aggregation(START_TIME, false)); + break; + case BY_DURATION: + builder.order(BucketOrder.aggregation(LATENCY, false)); + break; + } + sourceBuilder.aggregation(builder); + + SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder); + + TraceBrief traceBrief = new TraceBrief(); + + Terms terms = response.getAggregations().get(TRACE_ID); + + for (Terms.Bucket termsBucket : terms.getBuckets()) { + BasicTrace basicTrace = new BasicTrace(); + + basicTrace.setSegmentId(termsBucket.getKeyAsString()); + Min startTime = termsBucket.getAggregations().get(START_TIME); + Max latency = termsBucket.getAggregations().get(LATENCY); + basicTrace.setStart(String.valueOf((long)startTime.getValue())); + basicTrace.getEndpointNames().add(""); + basicTrace.setDuration((int)latency.getValue()); + basicTrace.setError(false); + basicTrace.getTraceIds().add(termsBucket.getKeyAsString()); + traceBrief.getTraces().add(basicTrace); + } + + return traceBrief; + } + + @Override public List queryByTraceId(String traceId) throws IOException { + return Collections.emptyList(); + } + + @Override public List doFlexibleTraceQuery( + String traceId) throws IOException { + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId)); + sourceBuilder.sort(START_TIME, SortOrder.ASC); + sourceBuilder.size(1000); + + SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder); + + List spanList = new ArrayList<>(); + + boolean isFirst = true; + for (SearchHit searchHit : response.getHits().getHits()) { + int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue(); + String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY); + Span span = SpanBytesDecoder.PROTO3.decodeOne(Base64.getDecoder().decode(dataBinaryBase64)); + + org.apache.skywalking.oap.server.core.query.entity.Span swSpan = new org.apache.skywalking.oap.server.core.query.entity.Span(); + + swSpan.setTraceId(span.traceId()); + swSpan.setEndpointName(span.name()); + swSpan.setStartTime(span.timestamp() / 1000); + swSpan.setEndTime(swSpan.getStartTime() + span.durationAsLong() / 1000); + span.tags().forEach((key, value) -> { + swSpan.getTags().add(new KeyValue(key, value)); + }); + span.annotations().forEach(annotation -> { + LogEntity entity = new LogEntity(); + entity.setTime(annotation.timestamp() / 1000); + entity.getData().add(new KeyValue("annotation", annotation.value())); + swSpan.getLogs().add(entity); + }); + if (serviceId != Const.NONE) { + swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName()); + } + swSpan.setSpanId(0); + swSpan.setParentSpanId(-1); + swSpan.setSegmentSpanId(span.id()); + swSpan.setSegmentId(span.id()); + Span.Kind kind = span.kind(); + switch (kind) { + case CLIENT: + case PRODUCER: + swSpan.setType("Entry"); + break; + case SERVER: + case CONSUMER: + swSpan.setType("Exit"); + break; + default: + swSpan.setType("Local"); + + } + + if (isFirst) { + swSpan.setRoot(true); + swSpan.setSegmentParentSpanId(""); + isFirst = false; + } else { + Ref ref = new Ref(); + ref.setTraceId(span.traceId()); + ref.setParentSegmentId(span.parentId()); + ref.setType(RefType.CROSS_PROCESS); + ref.setParentSpanId(0); + + swSpan.getRefs().add(ref); + swSpan.setSegmentParentSpanId(span.parentId()); + } + spanList.add(swSpan); + } + return spanList; + } +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100644 index 0000000000..de8e186612 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.ZipkinStorageModuleElasticsearchProvider \ No newline at end of file -- GitLab