提交 601b472e 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Support Backend acts as pure Zipkin collector (#2424)

* Codebase for zipkin span persistence.

* Fix missing fields in storage.

* Miss the latency field.

* Finish some tests.

* Fix wrong latency.

* Finish doc and reset application.yml

* Make the description more clear.
上级 1fd5c579
......@@ -11,8 +11,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **receiver-jvm**. gRPC services accept JVM metric data.
1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services.
1. **envoy-metric**. Envoy `metrics_service` supported by this receiver. OAL script support all GAUGE type metrics.
1. **receiver_zipkin**. HTTP service accepts Span in Zipkin v1 and v2 formats. Notice, this receiver only
works as expected in backend single node mode. Cluster mode is not supported. Welcome anyone to improve this.
1. **receiver_zipkin**. See [details](#zipkin-receiver).
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
......@@ -59,4 +58,27 @@ receiver-sharing-server:
```
Notice, if you add these settings, make sure they are not as same as core module,
because gRPC/HTTP servers of core are still used for UI and OAP internal communications.
\ No newline at end of file
because gRPC/HTTP servers of core are still used for UI and OAP internal communications.
## Zipkin receiver
Zipkin receiver could work in two different mode.
1. Tracing mode(default). Tracing mode is that, skywalking OAP acts like zipkin collector,
fully supports Zipkin v1/v2 formats through HTTP service,
also provide persistence and query in skywalking UI.
But it wouldn't analysis metric from them. In most case, I suggest you could use this feature, when metrics come from service mesh.
Notice, in this mode, Zipkin receiver requires `zipkin-elasticsearch` storage implementation active.
Read [this](backend-storage.md#elasticsearch-6-with-zipkin-trace-extension) to know
how to active.
1. Analysis mode(Not production ready), receive Zipkin v1/v2 formats through HTTP service. Transform the trace to skywalking
native format, and analysis like skywalking trace. This feature can't work in production env right now,
because of Zipkin tag/endpoint value unpredictable, we can't make sure it fits production env requirements.
Active `analysis mode`, you should set `needAnalysis` config.
```yaml
receiver_zipkin:
default:
host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
needAnalysis: true
```
\ No newline at end of file
......@@ -49,6 +49,26 @@ storage:
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
### ElasticSearch 6 with Zipkin trace extension
This implementation shares most of `elasticsearch`, just extend to support zipkin span storage.
It has all same configs.
```yaml
storage:
zipkin-elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
### About Namespace
When namespace is set, names of all indexes in ElasticSearch will use it as prefix.
......
......@@ -103,14 +103,18 @@ public class TraceQueryService implements Service {
Trace trace = new Trace();
List<SegmentRecord> segmentRecords = getTraceQueryDAO().queryByTraceId(traceId);
for (SegmentRecord segment : segmentRecords) {
if (nonNull(segment)) {
if (segment.getVersion() == 2) {
SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
} else {
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
if (segmentRecords.isEmpty()) {
trace.getSpans().addAll(getTraceQueryDAO().doFlexibleTraceQuery(traceId));
} else {
for (SegmentRecord segment : segmentRecords) {
if (nonNull(segment)) {
if (segment.getVersion() == 2) {
SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
} else {
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
}
}
}
}
......
......@@ -28,4 +28,12 @@ import lombok.*;
public class KeyValue {
private String key;
private String value;
public KeyValue(String key, String value) {
this.key = key;
this.value = value;
}
public KeyValue() {
}
}
......@@ -59,6 +59,7 @@ public class DefaultScopeDefine {
public static final int SERVICE_INSTANCE_CLR_GC = 20;
public static final int SERVICE_INSTANCE_CLR_THREAD = 21;
public static final int ENVOY_INSTANCE_METRIC = 22;
public static final int ZIPKIN_SPAN = 23;
/**
* Catalog of scope, the indicator processor could use this to group all generated indicators by oal tool.
......
......@@ -34,4 +34,12 @@ public interface ITraceQueryDAO extends Service {
int limit, int from, TraceState traceState, QueryOrder queryOrder) throws IOException;
List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
/**
* This method gives more flexible for unnative
* @param traceId
* @return
* @throws IOException
*/
List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
}
......@@ -35,6 +35,11 @@
<artifactId>skywalking-trace-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-zipkin-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-register-receiver-plugin</artifactId>
......
......@@ -19,14 +19,14 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class CoreRegisterLinker {
private static volatile ModuleManager MODULE_MANAGER;
private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
private static volatile IEndpointInventoryRegister ENDPOINT_INVENTORY_REGISTER;
public static void setModuleManager(ModuleManager moduleManager) {
CoreRegisterLinker.MODULE_MANAGER = moduleManager;
......@@ -45,4 +45,11 @@ public class CoreRegisterLinker {
}
return SERVICE_INSTANCE_INVENTORY_REGISTER;
}
public static IEndpointInventoryRegister getEndpointInventoryRegister() {
if (ENDPOINT_INVENTORY_REGISTER == null) {
ENDPOINT_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IEndpointInventoryRegister.class);
}
return ENDPOINT_INVENTORY_REGISTER;
}
}
......@@ -18,57 +18,20 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author wusheng
*/
@Setter
@Getter
public class ZipkinReceiverConfig extends ModuleConfig {
private String host;
private int port;
private String contextPath;
private int expireTime = 20;
private int maxCacheSize = 1_000_000;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getContextPath() {
return contextPath;
}
public void setContextPath(String contextPath) {
this.contextPath = contextPath;
}
public int getExpireTime() {
return expireTime;
}
public void setExpireTime(int expireTime) {
this.expireTime = expireTime;
}
public int getMaxCacheSize() {
return maxCacheSize;
}
public void setMaxCacheSize(int maxCacheSize) {
this.maxCacheSize = maxCacheSize;
}
private boolean needAnalysis = false;
private boolean registerZipkinEndpoint = true;
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
......@@ -27,9 +28,10 @@ import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
/**
* @author wusheng
......@@ -65,12 +67,14 @@ public class ZipkinReceiverProvider extends ModuleProvider {
jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath());
jettyServer.initialize();
jettyServer.addHandler(new SpanV1JettyHandler(config));
jettyServer.addHandler(new SpanV2JettyHandler(config));
jettyServer.addHandler(new SpanV1JettyHandler(config, getManager()));
jettyServer.addHandler(new SpanV2JettyHandler(config, getManager()));
ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
if (config.isNeedAnalysis()) {
ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
}
}
@Override public void notifyAfterCompleted() throws ModuleStartException {
......@@ -82,6 +86,13 @@ public class ZipkinReceiverProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {TraceModule.NAME};
if (config.isNeedAnalysis()) {
return new String[] {TraceModule.NAME};
} else {
/**
* In pure trace status, we don't need the trace receiver.
*/
return new String[] {CoreModule.NAME};
}
}
}
......@@ -16,12 +16,12 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.SegmentListener;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
/**
* Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import java.util.List;
import org.apache.skywalking.oap.server.receiver.zipkin.*;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.CacheFactory;
import zipkin2.Span;
public class ZipkinSkyWalkingTransfer {
public void doTransfer(ZipkinReceiverConfig config, List<Span> spanList) {
spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
String applicationCode = span.localServiceName();
if (applicationCode != null) {
int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode, null);
if (applicationId != 0) {
CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode,
span.timestampAsLong(),
ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode));
}
}
CacheFactory.INSTANCE.get(config).addSpan(span);
});
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
import com.google.gson.JsonObject;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
......
......@@ -16,10 +16,10 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.cache;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine.CaffeineSpanCache;
/**
* @author wusheng
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
......@@ -28,9 +28,9 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.cache.ISpanCache;
import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.ISpanCache;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Span;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.data;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
import java.util.LinkedList;
import java.util.List;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.data;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
import java.util.LinkedList;
import java.util.List;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.transform;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import com.google.common.base.Strings;
import java.util.*;
......@@ -25,8 +25,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.v2.*;
import org.apache.skywalking.oap.server.receiver.zipkin.*;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.eclipse.jetty.util.StringUtil;
import zipkin2.*;
......
......@@ -16,9 +16,9 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.transform;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
public interface SegmentListener {
void notify(SkyWalkingTrace trace);
......
......@@ -16,12 +16,12 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.transform;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
import zipkin2.Span;
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
/**
* @author wusheng
*/
public class SpanEncode {
public static final int PROTO3 = 1;
public static final int JSON_V2 = 2;
public static final int THRIFT = 3;
public static final int JSON_V1 = 4;
public static boolean isProto3(int encode) {
return PROTO3 == encode;
}
public static boolean isJsonV2(int encode) {
return JSON_V2 == encode;
}
public static boolean isThrift(int encode) {
return THRIFT == encode;
}
public static boolean isJsonV1(int encode) {
return JSON_V1 == encode;
}
}
......@@ -18,20 +18,33 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder;
import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.ZipkinSkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
public class SpanProcessor {
private SourceReceiver receiver;
private ServiceInventoryCache serviceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
private int encode;
public SpanProcessor(SourceReceiver receiver,
ServiceInventoryCache serviceInventoryCache,
EndpointInventoryCache endpointInventoryCache, int encode) {
this.receiver = receiver;
this.serviceInventoryCache = serviceInventoryCache;
this.endpointInventoryCache = endpointInventoryCache;
this.encode = encode;
}
void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
InputStream inputStream = getInputStream(request);
ByteArrayOutputStream out = new ByteArrayOutputStream();
......@@ -44,20 +57,13 @@ public class SpanProcessor {
List<Span> spanList = decoder.decodeList(out.toByteArray());
spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
String applicationCode = span.localServiceName();
if (applicationCode != null) {
int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode, null);
if (applicationId != 0) {
CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode,
span.timestampAsLong(),
ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode));
}
}
CacheFactory.INSTANCE.get(config).addSpan(span);
});
if (config.isNeedAnalysis()) {
ZipkinSkyWalkingTransfer transfer = new ZipkinSkyWalkingTransfer();
transfer.doTransfer(config, spanList);
} else {
SpanForward forward = new SpanForward(config, receiver, serviceInventoryCache, endpointInventoryCache, encode);
forward.send(spanList);
}
}
private InputStream getInputStream(HttpServletRequest request) throws IOException {
......
......@@ -18,20 +18,29 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
import zipkin2.codec.SpanBytesDecoder;
public class SpanV1JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
private SourceReceiver sourceReceiver;
private ServiceInventoryCache serviceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
public SpanV1JettyHandler(ZipkinReceiverConfig config) {
public SpanV1JettyHandler(ZipkinReceiverConfig config,
ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.config = config;
}
......@@ -48,11 +57,13 @@ public class SpanV1JettyHandler extends JettyHandler {
try {
String type = request.getHeader("Content-Type");
SpanBytesDecoder decoder = type != null && type.contains("/x-thrift")
? SpanBytesDecoder.THRIFT
: SpanBytesDecoder.JSON_V1;
int encode = type != null && type.contains("/x-thrift") ? SpanEncode.THRIFT : SpanEncode.JSON_V1;
SpanProcessor processor = new SpanProcessor();
SpanBytesDecoder decoder = SpanEncode.isThrift(encode)
? SpanBytesDecoder.THRIFT
: SpanBytesDecoder.JSON_V1;
SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache, encode);
processor.convert(config, decoder, request);
response.setStatus(202);
......
......@@ -18,12 +18,14 @@
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
import zipkin2.codec.SpanBytesDecoder;
/**
......@@ -33,8 +35,15 @@ public class SpanV2JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
private SourceReceiver sourceReceiver;
private ServiceInventoryCache serviceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
public SpanV2JettyHandler(ZipkinReceiverConfig config) {
public SpanV2JettyHandler(ZipkinReceiverConfig config,
ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.config = config;
}
......@@ -51,11 +60,13 @@ public class SpanV2JettyHandler extends JettyHandler {
try {
String type = request.getHeader("Content-Type");
SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf")
int encode = type != null && type.contains("/x-protobuf") ? SpanEncode.PROTO3 : SpanEncode.JSON_V2;
SpanBytesDecoder decoder = SpanEncode.isProto3(encode)
? SpanBytesDecoder.PROTO3
: SpanBytesDecoder.JSON_V2;
SpanProcessor processor = new SpanProcessor();
SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache, encode);
processor.convert(config, decoder, request);
response.setStatus(202);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.trace;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.util.*;
import org.apache.skywalking.oap.server.receiver.zipkin.*;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanEncode;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;
/**
* @author wusheng
*/
public class SpanForward {
private ZipkinReceiverConfig config;
private SourceReceiver receiver;
private ServiceInventoryCache serviceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
private int encode;
public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver,
ServiceInventoryCache serviceInventoryCache,
EndpointInventoryCache endpointInventoryCache, int encode) {
this.config = config;
this.receiver = receiver;
this.serviceInventoryCache = serviceInventoryCache;
this.endpointInventoryCache = endpointInventoryCache;
this.encode = encode;
}
public void send(List<Span> spanList) {
spanList.forEach(span -> {
ZipkinSpan zipkinSpan = new ZipkinSpan();
zipkinSpan.setTraceId(span.traceId());
zipkinSpan.setSpanId(span.id());
String serviceName = span.localServiceName();
int serviceId = Const.NONE;
if (!StringUtil.isEmpty(serviceName)) {
serviceId = serviceInventoryCache.getServiceId(serviceName);
if (serviceId != Const.NONE) {
zipkinSpan.setServiceId(serviceId);
} else {
/**
* Only register, but don't wait.
* For this span, service id will be missed.
*/
CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, null);
}
}
String spanName = span.name();
Span.Kind kind = span.kind();
switch (kind) {
case SERVER:
case CONSUMER:
if (!StringUtil.isEmpty(spanName) && serviceId != Const.NONE) {
int endpointId = endpointInventoryCache.getEndpointId(serviceId, spanName,
DetectPoint.SERVER.ordinal());
if (endpointId != Const.NONE) {
zipkinSpan.setEndpointId(endpointId);
} else if (config.isRegisterZipkinEndpoint()) {
CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(serviceId, spanName, DetectPoint.SERVER);
}
}
}
if (!StringUtil.isEmpty(spanName)) {
zipkinSpan.setEndpointName(spanName);
}
long startTime = span.timestampAsLong() / 1000;
zipkinSpan.setStartTime(startTime);
if (startTime != 0) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(zipkinSpan.getStartTime());
zipkinSpan.setTimeBucket(timeBucket);
}
long latency = span.durationAsLong() / 1000;
zipkinSpan.setEndTime(startTime + latency);
zipkinSpan.setIsError(BooleanUtils.booleanToValue(false));
zipkinSpan.setEncode(SpanEncode.PROTO3);
zipkinSpan.setLatency((int)latency);
zipkinSpan.setDataBinary(SpanBytesEncoder.PROTO3.encode(span));
receiver.receive(zipkinSpan);
});
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.transform;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
import com.google.gson.JsonObject;
import java.io.UnsupportedEncodingException;
......@@ -26,7 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.*;
import org.apache.skywalking.oap.server.core.register.NodeType;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker;
import org.apache.skywalking.oap.server.receiver.zipkin.data.*;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.*;
import org.junit.*;
import org.powermock.reflect.Whitebox;
import zipkin2.Span;
......
......@@ -126,6 +126,11 @@
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-zipkin-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- storage module -->
<!-- queryBuild module -->
......
......@@ -30,6 +30,7 @@
<modules>
<module>storage-jdbc-hikaricp-plugin</module>
<module>storage-elasticsearch-plugin</module>
<module>storage-zipkin-plugin</module>
</modules>
</project>
\ No newline at end of file
......@@ -59,7 +59,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
this.password = password;
}
int getIndexShardsNumber() {
public int getIndexShardsNumber() {
return indexShardsNumber;
}
......@@ -67,7 +67,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
this.indexShardsNumber = indexShardsNumber;
}
int getIndexReplicasNumber() {
public int getIndexReplicasNumber() {
return indexReplicasNumber;
}
......
......@@ -70,8 +70,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
private final StorageModuleElasticsearchConfig config;
private ElasticSearchClient elasticSearchClient;
protected final StorageModuleElasticsearchConfig config;
protected ElasticSearchClient elasticSearchClient;
public StorageModuleElasticsearchProvider() {
super();
......
......@@ -147,4 +147,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
}
return segmentRecords;
}
@Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
return Collections.emptyList();
}
}
......@@ -166,6 +166,10 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
return segmentRecords;
}
@Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
return Collections.emptyList();
}
protected JDBCHikariCPClient getClient() {
return h2Client;
}
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-storage-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>storage-zipkin-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.zipkin;
import lombok.*;
import org.apache.skywalking.oap.server.core.source.*;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*;
/**
* @author peng-yongsheng
*/
@ScopeDeclaration(id = ZIPKIN_SPAN, name = "ZipkinSpan")
public class ZipkinSpan extends Source {
@Override public int scope() {
return DefaultScopeDefine.ZIPKIN_SPAN;
}
@Override public String getEntityId() {
return traceId + spanId;
}
@Setter @Getter private String traceId;
@Setter @Getter private String spanId;
@Setter @Getter private int serviceId;
@Setter @Getter private int serviceInstanceId;
@Setter @Getter private String endpointName;
@Setter @Getter private int endpointId;
@Setter @Getter private long startTime;
@Setter @Getter private long endTime;
@Setter @Getter private int latency;
@Setter @Getter private int isError;
@Setter @Getter private byte[] dataBinary;
@Setter @Getter private int encode;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.zipkin;
import java.util.*;
import lombok.*;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@RecordType
@StorageEntity(name = ZipkinSpanRecord.INDEX_NAME, builder = ZipkinSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ZIPKIN_SPAN)
public class ZipkinSpanRecord extends Record {
public static final String INDEX_NAME = "zipkin_span";
public static final String TRACE_ID = "trace_id";
public static final String SPAN_ID = "span_id";
public static final String SERVICE_ID = "service_id";
public static final String SERVICE_INSTANCE_ID = "service_instance_id";
public static final String ENDPOINT_NAME = "endpoint_name";
public static final String ENDPOINT_ID = "endpoint_id";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String LATENCY = "latency";
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
public static final String ENCODE = "encode";
@Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
@Setter @Getter @Column(columnName = SPAN_ID) @IDColumn private String spanId;
@Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) @IDColumn private int serviceInstanceId;
@Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) @IDColumn private String endpointName;
@Setter @Getter @Column(columnName = ENDPOINT_ID) @IDColumn private int endpointId;
@Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
@Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
@Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
@Setter @Getter @Column(columnName = ENCODE) @IDColumn private int encode;
@Override public String id() {
return traceId + "-" + spanId;
}
public static class Builder implements StorageBuilder<ZipkinSpanRecord> {
@Override public Map<String, Object> data2Map(ZipkinSpanRecord storageData) {
Map<String, Object> map = new HashMap<>();
map.put(TRACE_ID, storageData.getTraceId());
map.put(SPAN_ID, storageData.getSpanId());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
map.put(ENDPOINT_NAME, storageData.getEndpointName());
map.put(ENDPOINT_ID, storageData.getEndpointId());
map.put(START_TIME, storageData.getStartTime());
map.put(END_TIME, storageData.getEndTime());
map.put(LATENCY, storageData.getLatency());
map.put(IS_ERROR, storageData.getIsError());
map.put(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
map.put(DATA_BINARY, Const.EMPTY_STRING);
} else {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
map.put(ENCODE, storageData.getEncode());
return map;
}
@Override public ZipkinSpanRecord map2Data(Map<String, Object> dbMap) {
ZipkinSpanRecord record = new ZipkinSpanRecord();
record.setTraceId((String)dbMap.get(TRACE_ID));
record.setSpanId((String)dbMap.get(SPAN_ID));
record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
record.setServiceInstanceId(((Number)dbMap.get(SERVICE_INSTANCE_ID)).intValue());
record.setEndpointName((String)dbMap.get(ENDPOINT_NAME));
record.setEndpointId(((Number)dbMap.get(ENDPOINT_ID)).intValue());
record.setStartTime(((Number)dbMap.get(START_TIME)).longValue());
record.setEndTime(((Number)dbMap.get(END_TIME)).longValue());
record.setLatency(((Number)dbMap.get(LATENCY)).intValue());
record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) {
record.setDataBinary(new byte[] {});
} else {
record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
}
record.setEncode(((Number)dbMap.get(ENCODE)).intValue());
return record;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.zipkin;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
/**
* Dispatch for Zipkin native mode spans.
*
* @author wusheng
*/
public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan> {
@Override public void dispatch(ZipkinSpan source) {
ZipkinSpanRecord segment = new ZipkinSpanRecord();
segment.setTraceId(source.getTraceId());
segment.setSpanId(source.getSpanId());
segment.setServiceId(source.getServiceId());
segment.setServiceInstanceId(source.getServiceInstanceId());
segment.setEndpointName(source.getEndpointName());
segment.setEndpointId(source.getEndpointId());
segment.setStartTime(source.getStartTime());
segment.setEndTime(source.getEndTime());
segment.setLatency(source.getLatency());
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setEncode(source.getEncode());
RecordProcess.INSTANCE.in(segment);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
private static final Logger logger = LoggerFactory.getLogger(ZipkinStorageModuleElasticsearchProvider.class);
private ZipkinTraceQueryEsDAO traceQueryEsDAO;
@Override
public String name() {
return "zipkin-elasticsearch";
}
@Override
public void prepare() throws ServiceNotProvidedException {
super.prepare();
traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient);
this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
}
@Override public void notifyAfterCompleted() {
super.notifyAfterCompleted();
traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.*;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.*;
public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
@Setter
private ServiceInventoryCache serviceInventoryCache;
public ZipkinTraceQueryEsDAO(
ElasticSearchClient client) {
super(client);
}
@Override
public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from,
TraceState traceState, QueryOrder queryOrder) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTB != 0 && endSecondTB != 0) {
mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (minDuration != 0 || maxDuration != 0) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
if (minDuration != 0) {
rangeQueryBuilder.gte(minDuration);
}
if (maxDuration != 0) {
rangeQueryBuilder.lte(maxDuration);
}
boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (!Strings.isNullOrEmpty(endpointName)) {
mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
}
if (serviceId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
}
if (serviceInstanceId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (endpointId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(traceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
}
switch (traceState) {
case ERROR:
mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
break;
case SUCCESS:
mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
break;
}
TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID).field(TRACE_ID).size(limit)
.subAggregation(
AggregationBuilders.max(LATENCY).field(LATENCY)
)
.subAggregation(
AggregationBuilders.min(START_TIME).field(START_TIME)
);
switch (queryOrder) {
case BY_START_TIME:
builder.order(BucketOrder.aggregation(START_TIME, false));
break;
case BY_DURATION:
builder.order(BucketOrder.aggregation(LATENCY, false));
break;
}
sourceBuilder.aggregation(builder);
SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
Terms terms = response.getAggregations().get(TRACE_ID);
for (Terms.Bucket termsBucket : terms.getBuckets()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(termsBucket.getKeyAsString());
Min startTime = termsBucket.getAggregations().get(START_TIME);
Max latency = termsBucket.getAggregations().get(LATENCY);
basicTrace.setStart(String.valueOf((long)startTime.getValue()));
basicTrace.getEndpointNames().add("");
basicTrace.setDuration((int)latency.getValue());
basicTrace.setError(false);
basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
traceBrief.getTraces().add(basicTrace);
}
return traceBrief;
}
@Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
return Collections.emptyList();
}
@Override public List<org.apache.skywalking.oap.server.core.query.entity.Span> doFlexibleTraceQuery(
String traceId) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
sourceBuilder.sort(START_TIME, SortOrder.ASC);
sourceBuilder.size(1000);
SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();
boolean isFirst = true;
for (SearchHit searchHit : response.getHits().getHits()) {
int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
Span span = SpanBytesDecoder.PROTO3.decodeOne(Base64.getDecoder().decode(dataBinaryBase64));
org.apache.skywalking.oap.server.core.query.entity.Span swSpan = new org.apache.skywalking.oap.server.core.query.entity.Span();
swSpan.setTraceId(span.traceId());
swSpan.setEndpointName(span.name());
swSpan.setStartTime(span.timestamp() / 1000);
swSpan.setEndTime(swSpan.getStartTime() + span.durationAsLong() / 1000);
span.tags().forEach((key, value) -> {
swSpan.getTags().add(new KeyValue(key, value));
});
span.annotations().forEach(annotation -> {
LogEntity entity = new LogEntity();
entity.setTime(annotation.timestamp() / 1000);
entity.getData().add(new KeyValue("annotation", annotation.value()));
swSpan.getLogs().add(entity);
});
if (serviceId != Const.NONE) {
swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
}
swSpan.setSpanId(0);
swSpan.setParentSpanId(-1);
swSpan.setSegmentSpanId(span.id());
swSpan.setSegmentId(span.id());
Span.Kind kind = span.kind();
switch (kind) {
case CLIENT:
case PRODUCER:
swSpan.setType("Entry");
break;
case SERVER:
case CONSUMER:
swSpan.setType("Exit");
break;
default:
swSpan.setType("Local");
}
if (isFirst) {
swSpan.setRoot(true);
swSpan.setSegmentParentSpanId("");
isFirst = false;
} else {
Ref ref = new Ref();
ref.setTraceId(span.traceId());
ref.setParentSegmentId(span.parentId());
ref.setType(RefType.CROSS_PROCESS);
ref.setParentSpanId(0);
swSpan.getRefs().add(ref);
swSpan.setSegmentParentSpanId(span.parentId());
}
spanList.add(swSpan);
}
return spanList;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.ZipkinStorageModuleElasticsearchProvider
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册