提交 93a45165 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #265 from wu-sheng/feature/254

Feature/254
......@@ -2,14 +2,20 @@ package org.skywalking.apm.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import java.io.IOException;
import java.util.ServiceLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.AbstractLocalWorkerProvider;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LookUp;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.UsedRoleNameException;
import org.skywalking.apm.collector.cluster.WorkersListener;
import org.skywalking.apm.collector.config.ConfigInitializer;
import java.io.IOException;
import java.util.ServiceLoader;
import org.skywalking.apm.collector.rpc.RPCAddressListener;
/**
* @author pengys5
......@@ -38,6 +44,7 @@ public class CollectorSystem {
private void createListener() {
clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WORK_NAME);
clusterContext.getAkkaSystem().actorOf(Props.create(RPCAddressListener.class, clusterContext), RPCAddressListener.WORK_NAME);
}
private void createClusterWorkers() throws ProviderNotFoundException {
......
......@@ -8,6 +8,9 @@ import akka.cluster.MemberStatus;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.cluster.WorkerListenerMessage;
import org.skywalking.apm.collector.cluster.WorkersListener;
import org.skywalking.apm.collector.rpc.RPCAddress;
import org.skywalking.apm.collector.rpc.RPCAddressListener;
import org.skywalking.apm.collector.rpc.RPCAddressListenerMessage;
import org.skywalking.apm.collector.log.LogManager;
/**
......@@ -56,10 +59,12 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
private Cluster cluster;
private final AbstractClusterWorker ownerWorker;
private final RPCAddress RPCAddress;
public WorkerWithAkka(AbstractClusterWorker ownerWorker) {
public WorkerWithAkka(AbstractClusterWorker ownerWorker, RPCAddress RPCAddress) {
this.ownerWorker = ownerWorker;
cluster = Cluster.get(getContext().system());
this.RPCAddress = RPCAddress;
}
@Override
......@@ -108,6 +113,11 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WORK_NAME).tell(registerMessage, getSelf());
}
if (member.hasRole(RPCAddressListener.WORK_NAME) && RPCAddress != null) {
RPCAddressListenerMessage.ConfigMessage configMessage = new RPCAddressListenerMessage.ConfigMessage(RPCAddress);
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + RPCAddressListener.WORK_NAME).tell(configMessage, getSelf());
}
}
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.actor;
import akka.actor.ActorRef;
import akka.actor.Props;
import org.skywalking.apm.collector.rpc.RPCAddress;
/**
* The <code>AbstractClusterWorkerProvider</code> implementations represent providers,
......@@ -20,6 +21,10 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
*/
public abstract int workerNum();
public RPCAddress config() {
return null;
}
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
......@@ -35,10 +40,14 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
T clusterWorker = workerInstance(getClusterContext());
clusterWorker.preStart();
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role().roleName() + "_" + num);
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker, config()), role().roleName() + "_" + num);
ClusterWorkerRef workerRef = new ClusterWorkerRef(actorRef, role());
getClusterContext().put(workerRef);
if (config() != null) {
getClusterContext().getRpcContext().putAddress("Self", config());
}
return workerRef;
}
}
package org.skywalking.apm.collector.actor;
import akka.actor.ActorSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.rpc.RPCAddressContext;
/**
* @author pengys5
......@@ -15,6 +15,7 @@ public class ClusterWorkerContext extends WorkerContext {
private final ActorSystem akkaSystem;
private Map<String, AbstractWorkerProvider> providers = new ConcurrentHashMap<>();
private RPCAddressContext rpcContext = new RPCAddressContext();
public ClusterWorkerContext(ActorSystem akkaSystem) {
this.akkaSystem = akkaSystem;
......@@ -43,4 +44,8 @@ public class ClusterWorkerContext extends WorkerContext {
providers.put(provider.role().roleName(), provider);
}
}
public RPCAddressContext getRpcContext() {
return rpcContext;
}
}
package org.skywalking.apm.collector.cluster;
import java.io.Serializable;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.Role;
import java.io.Serializable;
/**
* <code>WorkerListenerMessage</code> is a message just for the worker
* implementation of the {@link AbstractWorker}
......
......@@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class WorkersListener extends UntypedActor {
public static final String WORK_NAME = "WorkersListener";
private static final Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
private final Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
private Map<ActorRef, ClusterWorkerRef> relation = new ConcurrentHashMap<>();
......
package org.skywalking.apm.collector.rpc;
/**
* @author pengys5
*/
public class RPCAddress {
private final String address;
private final int port;
public RPCAddress(String address, int port) {
this.address = address;
this.port = port;
}
public String getAddress() {
return address;
}
public int getPort() {
return port;
}
}
package org.skywalking.apm.collector.rpc;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class RPCAddressContext {
private Map<String, RPCAddress> rpcAddresses = new ConcurrentHashMap<>();
public Collection<RPCAddress> rpcAddressCollection() {
return rpcAddresses.values();
}
public void putAddress(String ownerAddress, RPCAddress rpcAddress) {
rpcAddresses.put(ownerAddress, rpcAddress);
}
public void removeAddress(String ownerAddress) {
rpcAddresses.remove(ownerAddress);
}
}
package org.skywalking.apm.collector.rpc;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
/**
* @author pengys5
*/
public class RPCAddressListener extends UntypedActor {
private final Logger logger = LogManager.getFormatterLogger(RPCAddressListener.class);
public static final String WORK_NAME = "RPCAddressListener";
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
public RPCAddressListener(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
}
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof RPCAddressListenerMessage.ConfigMessage) {
RPCAddressListenerMessage.ConfigMessage configMessage = (RPCAddressListenerMessage.ConfigMessage)message;
ActorRef sender = getSender();
logger.info("address: %s, port: %s", configMessage.getConfig().getAddress(), configMessage.getConfig().getPort());
String ownerAddress = sender.path().address().hostPort();
clusterContext.getRpcContext().putAddress(ownerAddress, configMessage.getConfig());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated)message;
clusterContext.getRpcContext().removeAddress(terminated.getActor().path().address().hostPort());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember)message;
clusterContext.getRpcContext().removeAddress(unreachableMember.member().address().hostPort());
} else {
unhandled(message);
}
}
}
package org.skywalking.apm.collector.rpc;
import java.io.Serializable;
/**
* @author pengys5
*/
public class RPCAddressListenerMessage {
public static class ConfigMessage implements Serializable {
private final RPCAddress config;
public ConfigMessage(RPCAddress config) {
this.config = config;
}
public RPCAddress getConfig() {
return config;
}
}
}
package org.skywalking.apm.collector.commons.serializer;
import akka.serialization.JSerializer;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public class JsonSerializer extends JSerializer {
@Override
public boolean includeManifest() {
return false;
}
@Override
public int identifier() {
return 31;
}
@Override
public byte[] toBinary(Object o) {
JsonObject jsonObject = (JsonObject) o;
return jsonObject.toString().getBytes();
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
Gson gson = new Gson();
return gson.fromJson(new String(bytes), JsonObject.class);
}
}
......@@ -6,14 +6,14 @@ akka {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
// TraceSegment = "org.skywalking.apm.collector.worker.TraceSegmentSerializer"
json = "org.skywalking.apm.collector.commons.serializer.JsonSerializer"
// json = "org.skywalking.apm.collector.commons.serializer.JsonSerializer"
}
serialization-bindings {
"java.lang.String" = java
"com.google.protobuf.Message" = proto
// "TraceSegment" = TraceSegment
"com.google.gson.JsonObject" = json
// "com.google.gson.JsonObject" = json
}
warn-about-java-serializer-usage = on
......
......@@ -30,6 +30,11 @@
<artifactId>apm-collector-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
......@@ -43,7 +48,7 @@
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.2.2</version>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
......
......@@ -4,6 +4,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.CollectorSystem;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.worker.grpcserver.GRPCServer;
import org.skywalking.apm.collector.worker.httpserver.HttpServer;
import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.IndexCreator;
......@@ -23,5 +24,6 @@ public class CollectorBootStartUp {
IndexCreator.INSTANCE.create();
PersistenceTimer.INSTANCE.boot();
HttpServer.INSTANCE.boot((ClusterWorkerContext) collectorSystem.getClusterContext());
GRPCServer.INSTANCE.boot((ClusterWorkerContext) collectorSystem.getClusterContext());
}
}
package org.skywalking.apm.collector.worker.config;
/**
* @author pengys5
*/
public class GRPCConfig {
public static class GRPC {
public static String HOSTNAME = "";
public static String PORT = "";
}
}
package org.skywalking.apm.collector.worker.config;
import org.skywalking.apm.collector.config.ConfigProvider;
import org.skywalking.apm.util.StringUtil;
/**
* @author pengys5
*/
public class GRPCConfigProvider implements ConfigProvider {
@Override public Class configClass() {
return GRPCConfig.class;
}
@Override public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("grpc.HOSTNAME"))) {
GRPCConfig.GRPC.HOSTNAME = System.getProperty("grpc.HOSTNAME");
}
if (!StringUtil.isEmpty(System.getProperty("grpc.PORT"))) {
GRPCConfig.GRPC.PORT = System.getProperty("grpc.PORT");
}
}
}
......@@ -6,6 +6,12 @@ package org.skywalking.apm.collector.worker.config;
public class WorkerConfig {
public static class WorkerNum {
public static class GRPC {
public static class GRPCAddressRegister {
public static int VALUE = 1;
}
}
public static class Node {
public static class NodeCompAgg {
public static int VALUE = 2;
......
package org.skywalking.apm.collector.worker.globaltrace;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -30,13 +31,17 @@ public class GlobalTraceGetWithGlobalId extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("globalId")) {
throw new IllegalArgumentException("the request parameter must contains globalId");
}
......
package org.skywalking.apm.collector.worker.globaltrace.analysis;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
......@@ -14,9 +13,8 @@ import org.skywalking.apm.collector.worker.JoinAndSplitAnalysisMember;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.globaltrace.GlobalTraceIndex;
import org.skywalking.apm.collector.worker.globaltrace.persistence.GlobalTraceAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -31,18 +29,16 @@ public class GlobalTraceAnalysis extends JoinAndSplitAnalysisMember {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
String subSegmentId = segment.getTraceSegmentId();
List<String> globalTraceIdList = segment.getRelatedGlobalTraces().get();
if (CollectionTools.isNotEmpty(globalTraceIdList)) {
for (String globalTraceId : globalTraceIdList) {
set(globalTraceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
}
}
segmentWithTimeSlice.getGlobalTraceIds().forEach(globalTraceId -> {
set(globalTraceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
});
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
......@@ -2,12 +2,12 @@ package org.skywalking.apm.collector.worker.globaltrace.persistence;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.get.GetResponse;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -18,14 +18,14 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.globaltrace.GlobalTraceIndex;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.segment.entity.SpanView;
import org.skywalking.apm.collector.worker.segment.entity.TraceSegmentRef;
import org.skywalking.apm.collector.worker.storage.GetResponseFromEs;
import org.skywalking.apm.collector.worker.storage.JoinAndSplitData;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.util.StringUtil;
/**
......@@ -56,20 +56,16 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
List<SpanView> spanViewList = new ArrayList<>();
for (String subSegId : subSegIds) {
logger.debug("subSegId: %s", subSegId);
String segmentSource = GetResponseFromEs.INSTANCE.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, subSegId).getSourceAsString();
logger.debug("segmentSource: %s", segmentSource);
Segment segment = null;
try {
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);
}
GetResponse getResponse = GetResponseFromEs.INSTANCE.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, subSegId);
String segmentObjBlob = (String)getResponse.getSource().get(SegmentIndex.SEGMENT_OBJ_BLOB);
TraceSegmentObject segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentObjBlob);
String segmentId = segment.getTraceSegmentId();
List<TraceSegmentRef> refsList = segment.getRefs();
List<TraceSegmentReference> refsList = segment.getRefsList();
for (Span span : segment.getSpans()) {
for (SpanObject span : segment.getSpansList()) {
logger.debug(span.getOperationName());
spansDataBuild(span, segment.getApplicationCode(), segmentId, spanViewList, refsList);
spansDataBuild(span, segment.getTraceSegmentId(), segmentId, spanViewList, refsList);
}
}
......@@ -128,8 +124,8 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
return tempList;
}
private void spansDataBuild(Span span, String appCode, String segmentId, List<SpanView> spanViewList,
List<TraceSegmentRef> refsList) {
private void spansDataBuild(SpanObject span, String appCode, String segmentId, List<SpanView> spanViewList,
List<TraceSegmentReference> refsList) {
int spanId = span.getSpanId();
String spanSegId = segmentId + "--" + String.valueOf(spanId);
......@@ -152,9 +148,9 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
if (refsList.size() > 1) {
throw new UnsupportedOperationException("not support batch call");
} else {
TraceSegmentRef segmentRef = refsList.get(0);
int parentSpanId = segmentRef.getSpanId();
String parentSegId = segmentRef.getTraceSegmentId();
TraceSegmentReference segmentRef = refsList.get(0);
int parentSpanId = segmentRef.getParentSpanId();
String parentSegId = segmentRef.getParentTraceSegmentId();
String parentSpanSegId = parentSegId + "--" + String.valueOf(parentSpanId);
spanView.setParentSpanSegId(parentSpanSegId);
......
package org.skywalking.apm.collector.worker.grpcserver;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
/**
* @author pengys5
*/
public abstract class AbstractReceiver extends AbstractLocalSyncWorker {
public AbstractReceiver(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final @Override protected void onWork(Object request, Object response) throws WorkerException {
onReceive(request);
}
protected abstract void onReceive(
Object request) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException;
}
package org.skywalking.apm.collector.worker.grpcserver;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
/**
* @author pengys5
*/
public abstract class AbstractReceiverProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalSyncWorkerProvider<T> {
}
package org.skywalking.apm.collector.worker.grpcserver;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.Map;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
/**
* @author pengys5
*/
public class GRPCAddressGet extends AbstractGet {
protected GRPCAddressGet(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override protected Class<JsonArray> responseClass() {
return JsonArray.class;
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
((ClusterWorkerContext)getClusterContext()).getRpcContext().rpcAddressCollection().forEach(rpcAddress -> {
((JsonArray)response).add(rpcAddress.getAddress() + ":" + rpcAddress.getPort());
});
}
public static class Factory extends AbstractGetProvider<GRPCAddressGet> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public GRPCAddressGet workerInstance(ClusterWorkerContext clusterContext) {
return new GRPCAddressGet(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/grpc/addresses";
}
}
public enum Role implements org.skywalking.apm.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return GRPCAddressGet.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.worker.grpcserver;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.rpc.RPCAddress;
import org.skywalking.apm.collector.worker.config.GRPCConfig;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
/**
* @author pengys5
*/
public class GRPCAddressRegister extends AbstractClusterWorker {
GRPCAddressRegister(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
}
public static class Factory extends AbstractClusterWorkerProvider<GRPCAddressRegister> {
public static Factory INSTANCE = new Factory();
@Override
public GRPCAddressRegister.Role role() {
return Role.INSTANCE;
}
@Override public int workerNum() {
return WorkerConfig.WorkerNum.GRPC.GRPCAddressRegister.VALUE;
}
@Override public RPCAddress config() {
return new RPCAddress(GRPCConfig.GRPC.HOSTNAME, Integer.valueOf(GRPCConfig.GRPC.PORT));
}
@Override public GRPCAddressRegister workerInstance(ClusterWorkerContext clusterContext) {
return new GRPCAddressRegister(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements org.skywalking.apm.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return GRPCAddressRegister.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.worker.grpcserver;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.worker.config.GRPCConfig;
/**
* @author pengys5
*/
public enum GRPCServer {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(GRPCServer.class);
private Server server;
public void boot(ClusterWorkerContext clusterContext) throws Exception {
start(clusterContext);
blockUntilShutdown();
}
private void start(ClusterWorkerContext clusterContext) throws Exception {
int port = Integer.valueOf(GRPCConfig.GRPC.PORT);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port);
ServicesCreator.INSTANCE.boot(nettyServerBuilder, clusterContext);
server = nettyServerBuilder.build().start();
logger.info("Server started, listening on " + port);
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
}
package org.skywalking.apm.collector.worker.grpcserver;
import io.grpc.BindableService;
import io.grpc.netty.NettyServerBuilder;
import java.util.ServiceLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
/**
* @author pengys5
*/
public enum ServicesCreator {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(ServicesCreator.class);
public void boot(NettyServerBuilder nettyServerBuilder,
ClusterWorkerContext clusterContext) throws IllegalArgumentException, ProviderNotFoundException {
ServiceLoader<BindableService> grpcServiceLoader = java.util.ServiceLoader.load(BindableService.class);
for (BindableService service : grpcServiceLoader) {
logger.info("add grpc service %s into netty server builder ", service.getClass().getSimpleName());
nettyServerBuilder.addService(service);
((WorkerCaller)service).inject(clusterContext);
((WorkerCaller)service).preStart();
}
}
}
package org.skywalking.apm.collector.worker.grpcserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerRef;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author pengys5
*/
public class TraceSegmentServiceImpl extends TraceSegmentServiceGrpc.TraceSegmentServiceImplBase implements WorkerCaller {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentServiceImpl.class);
private ClusterWorkerContext clusterWorkerContext;
private WorkerRef segmentReceiverWorkRef;
@Override public void preStart() throws ProviderNotFoundException {
segmentReceiverWorkRef = clusterWorkerContext.findProvider(SegmentReceiver.WorkerRole.INSTANCE).create(AbstractWorker.noOwner());
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
if (logger.isDebugEnabled()) {
StringBuffer globalTraceIds = new StringBuffer();
logger.debug("global trace ids count: %s", segment.getGlobalTraceIdsList().size());
segment.getGlobalTraceIdsList().forEach(globalTraceId -> {
globalTraceIds.append(globalTraceId).append(",");
});
logger.debug("receive segment, global trace ids: %s, segment byte size: %s", globalTraceIds, segment.getSegment().size());
try {
segmentReceiverWorkRef.tell(segment);
} catch (WorkerInvokeException e) {
onError(e);
}
}
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
responseObserver.onCompleted();
}
};
}
@Override public void inject(ClusterWorkerContext clusterWorkerContext) {
this.clusterWorkerContext = clusterWorkerContext;
}
}
package org.skywalking.apm.collector.worker.grpcserver;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
/**
* @author pengys5
*/
public interface WorkerCaller {
void preStart() throws ProviderNotFoundException;
void inject(ClusterWorkerContext clusterWorkerContext);
}
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.util.Map;
import javax.servlet.ServletException;
......@@ -20,7 +20,7 @@ import org.skywalking.apm.collector.actor.WorkerNotFoundException;
* The <code>AbstractGet</code> implementations represent workers, which called by the server to allow a servlet to
* handle a GET request.
*
* <p>verride the {@link #onReceive(Map, JsonObject)} method to support a search service.
* <p>verride the {@link #onReceive(Map, JsonElement)} method to support a search service.
*
* @author pengys5
* @since v3.0-2017
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
......@@ -31,6 +31,8 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
super(role, clusterContext, selfContext);
}
protected abstract Class<? extends JsonElement> responseClass();
/**
* Override this method to implementing business logic.
*
......@@ -42,12 +44,16 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
*/
@Override protected void onWork(Object parameter,
Object response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonObject resJson = new JsonObject();
try {
JsonElement resJson = responseClass().newInstance();
onReceive((Map<String, String[]>)parameter, resJson);
onSuccessResponse((HttpServletResponse)response, resJson);
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (InstantiationException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
}
......@@ -55,23 +61,22 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
* Override this method to implementing business logic.
*
* @param parameter {@link Map}, get the request parameter by key.
* @param response {@link JsonObject}, set the response data as json object.
* @param response {@link JsonElement}, set the response data as json object.
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
protected abstract void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException;
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException;
/**
* Set the worker response and the success status into the servlet response object
*
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
* @param resJson {@link JsonObject} object that contains the response from worker
* @param resJson {@link JsonElement} that contains the response from worker
* @throws IOException if any error is detected when the servlet handles the response.
*/
protected void onSuccessResponse(HttpServletResponse response, JsonObject resJson) throws IOException {
resJson.addProperty("isSuccess", true);
protected void onSuccessResponse(HttpServletResponse response, JsonElement resJson) throws IOException {
reply(response, resJson, HttpServletResponse.SC_OK);
}
......@@ -82,14 +87,15 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
*/
protected void onErrorResponse(Exception exception, HttpServletResponse response) {
JsonObject resJson = new JsonObject();
resJson.addProperty("isSuccess", false);
resJson.addProperty("reason", exception.getMessage());
response.setHeader("reason", exception.getMessage());
try {
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
reply(response, responseClass().newInstance(), HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
} catch (InstantiationException e) {
logger.error(e.getMessage(), e);
}
}
......@@ -97,11 +103,11 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
* Build the response head and body
*
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
* @param resJson {@link JsonObject} object that contains the response from worker
* @param resJson {@link JsonElement} that contains the response from worker
* @param status http status code
* @throws IOException if an input or output error is detected when the servlet handles the response
*/
private void reply(HttpServletResponse response, JsonObject resJson, int status) throws IOException {
private void reply(HttpServletResponse response, JsonElement resJson, int status) throws IOException {
response.setContentType("text/json");
response.setCharacterEncoding("utf-8");
response.setStatus(status);
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.IOException;
......@@ -58,13 +59,13 @@ public abstract class AbstractStreamPost extends AbstractServlet {
* Override the default implementation, forbidden to call this method.
*
* @param parameter {@link Map}, get the request parameter by key.
* @param response {@link JsonObject}, set the response data as json object.
* @param response {@link JsonElement}, set the response data as json object.
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
@Override final protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
throw new WorkerInvokeException("Use the other method with buffer reader parameter");
}
......
......@@ -29,6 +29,5 @@ public enum HttpServer {
server.setHandler(servletContextHandler);
server.start();
server.join();
}
}
......@@ -9,12 +9,11 @@ import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
import org.skywalking.apm.collector.worker.node.NodeCompIndex;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.segment.entity.tag.Tags;
import org.skywalking.apm.collector.worker.tools.ClientSpanIsLeafTools;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.collector.worker.tools.SpanPeersTools;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -28,30 +27,29 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
super(role, clusterContext, selfContext);
}
final void analyseSpans(Segment segment) {
List<Span> spanList = segment.getSpans();
final void analyseSpans(TraceSegmentObject segment) {
List<SpanObject> spanList = segment.getSpansList();
logger.debug("node analysis span isNotEmpty %s", CollectionTools.isNotEmpty(spanList));
if (CollectionTools.isNotEmpty(spanList)) {
logger.debug("node analysis span list SIZE: %s", spanList.size());
for (Span span : spanList) {
String kind = Tags.SPAN_KIND.get(span);
if (Tags.SPAN_KIND_CLIENT.equals(kind) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
String peers = SpanPeersTools.INSTANCE.getPeers(span);
for (SpanObject span : spanList) {
if (SpanType.Exit.equals(span.getSpanType())) {
int peers = SpanPeersTools.INSTANCE.getPeers(span);
JsonObject compJsonObj = new JsonObject();
compJsonObj.addProperty(NodeCompIndex.PEERS, peers);
compJsonObj.addProperty(NodeCompIndex.NAME, Tags.COMPONENT.get(span));
compJsonObj.addProperty(NodeCompIndex.NAME, span.getComponent());
set(peers, compJsonObj);
} else if (Tags.SPAN_KIND_SERVER.equals(kind) && span.getParentSpanId() == -1) {
String peers = segment.getApplicationCode();
set(String.valueOf(peers), compJsonObj);
} else if (SpanType.Entry.equals(span.getSpanType()) && span.getParentSpanId() == -1) {
int peers = segment.getApplicationId();
JsonObject compJsonObj = new JsonObject();
compJsonObj.addProperty(NodeCompIndex.PEERS, peers);
compJsonObj.addProperty(NodeCompIndex.NAME, Tags.COMPONENT.get(span));
compJsonObj.addProperty(NodeCompIndex.NAME, span.getComponent());
set(peers, compJsonObj);
set(String.valueOf(peers), compJsonObj);
}
}
}
......
......@@ -10,9 +10,9 @@ import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
import org.skywalking.apm.collector.worker.node.NodeMappingIndex;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.TraceSegmentRef;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
/**
* @author pengys5
......@@ -26,15 +26,15 @@ abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
super(role, clusterContext, selfContext);
}
final void analyseRefs(Segment segment, long timeSlice) {
List<TraceSegmentRef> segmentRefList = segment.getRefs();
final void analyseRefs(TraceSegmentObject segment, long timeSlice) {
List<TraceSegmentReference> segmentRefList = segment.getRefsList();
logger.debug("node mapping analysis refs isNotEmpty %s", CollectionTools.isNotEmpty(segmentRefList));
if (CollectionTools.isNotEmpty(segmentRefList)) {
logger.debug("node mapping analysis refs list SIZE: %s", segmentRefList.size());
for (TraceSegmentRef segmentRef : segmentRefList) {
String peers = Const.PEERS_FRONT_SPLIT + segmentRef.getPeerHost() + Const.PEERS_BEHIND_SPLIT;
String code = segment.getApplicationCode();
for (TraceSegmentReference segmentRef : segmentRefList) {
String peers = Const.PEERS_FRONT_SPLIT + segmentRef.getNetworkAddress() + Const.PEERS_BEHIND_SPLIT;
int code = segment.getApplicationId();
JsonObject nodeMappingJsonObj = new JsonObject();
nodeMappingJsonObj.addProperty(NodeMappingIndex.CODE, code);
......
......@@ -11,8 +11,8 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.node.persistence.NodeCompAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -28,12 +28,12 @@ public class NodeCompAnalysis extends AbstractNodeCompAnalysis {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
analyseSpans(segment);
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
......@@ -11,8 +11,8 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.node.persistence.NodeMappingDayAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -28,12 +28,12 @@ public class NodeMappingDayAnalysis extends AbstractNodeMappingAnalysis {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getDay());
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
......@@ -11,8 +11,8 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.node.persistence.NodeMappingHourAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -28,12 +28,12 @@ public class NodeMappingHourAnalysis extends AbstractNodeMappingAnalysis {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getHour());
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
......@@ -11,8 +11,8 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.node.persistence.NodeMappingMinuteAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -28,12 +28,12 @@ public class NodeMappingMinuteAnalysis extends AbstractNodeMappingAnalysis {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getMinute());
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
package org.skywalking.apm.collector.worker.noderef;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -30,13 +31,17 @@ public class NodeRefResSumGetGroupWithTimeSlice extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeRefResSumGroupWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("timeSliceType")) {
throw new ArgumentsParseException("the request parameter must contains startTime,endTime,timeSliceType");
}
......
......@@ -10,12 +10,11 @@ import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
import org.skywalking.apm.collector.worker.noderef.NodeRefIndex;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.segment.entity.tag.Tags;
import org.skywalking.apm.collector.worker.tools.ClientSpanIsLeafTools;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.collector.worker.tools.SpanPeersTools;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -29,21 +28,21 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
super(role, clusterContext, selfContext);
}
final void analyseNodeRef(Segment segment, long timeSlice, long minute, long hour, long day,
final void analyseNodeRef(TraceSegmentObject segment, long timeSlice, long minute, long hour, long day,
int second) {
List<Span> spanList = segment.getSpans();
List<SpanObject> spanList = segment.getSpansList();
if (CollectionTools.isNotEmpty(spanList)) {
for (Span span : spanList) {
for (SpanObject span : spanList) {
JsonObject dataJsonObj = new JsonObject();
dataJsonObj.addProperty(NodeRefIndex.TIME_SLICE, timeSlice);
dataJsonObj.addProperty(NodeRefIndex.FRONT_IS_REAL_CODE, true);
dataJsonObj.addProperty(NodeRefIndex.BEHIND_IS_REAL_CODE, true);
if (Tags.SPAN_KIND_CLIENT.equals(Tags.SPAN_KIND.get(span)) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
String front = segment.getApplicationCode();
if (SpanType.Exit.equals(span.getSpanType())) {
int front = segment.getApplicationId();
dataJsonObj.addProperty(NodeRefIndex.FRONT, front);
String behind = SpanPeersTools.INSTANCE.getPeers(span);
int behind = SpanPeersTools.INSTANCE.getPeers(span);
dataJsonObj.addProperty(NodeRefIndex.BEHIND, behind);
dataJsonObj.addProperty(NodeRefIndex.BEHIND_IS_REAL_CODE, false);
......@@ -51,9 +50,9 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
logger.debug("dag node ref: %s", dataJsonObj.toString());
set(id, dataJsonObj);
buildNodeRefResRecordData(id, span, minute, hour, day, second);
} else if (Tags.SPAN_KIND_SERVER.equals(Tags.SPAN_KIND.get(span))) {
if (span.getParentSpanId() == -1 && CollectionTools.isEmpty(segment.getRefs())) {
String behind = segment.getApplicationCode();
} else if (SpanType.Entry.equals(span.getSpanType())) {
if (span.getParentSpanId() == -1 && segment.getRefsCount() == 0) {
int behind = segment.getApplicationId();
dataJsonObj.addProperty(NodeRefIndex.BEHIND, behind);
String front = Const.USER_CODE;
......@@ -68,13 +67,13 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
}
private void buildNodeRefResRecordData(String nodeRefId, Span span, long minute, long hour, long day,
private void buildNodeRefResRecordData(String nodeRefId, SpanObject span, long minute, long hour, long day,
int second) {
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord = new AbstractNodeRefResSumAnalysis.NodeRefResRecord(minute, hour, day, second);
refResRecord.setStartTime(span.getStartTime());
refResRecord.setEndTime(span.getEndTime());
refResRecord.setNodeRefId(nodeRefId);
refResRecord.setError(Tags.ERROR.get(span));
refResRecord.setError(span.getIsError());
sendToResSumAnalysis(refResRecord);
}
......
......@@ -13,8 +13,8 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefDayAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -36,9 +36,9 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
......@@ -46,7 +46,7 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
int second = segmentWithTimeSlice.getSecond();
analyseNodeRef(segment, segmentWithTimeSlice.getDay(), minute, hour, day, second);
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
......@@ -13,8 +13,8 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefHourAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -36,9 +36,9 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
......@@ -46,7 +46,7 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
int second = segmentWithTimeSlice.getSecond();
analyseNodeRef(segment, segmentWithTimeSlice.getHour(), minute, hour, day, second);
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
......@@ -13,8 +13,8 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefMinuteAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -36,16 +36,16 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
long day = segmentWithTimeSlice.getDay();
int second = segmentWithTimeSlice.getSecond();
analyseNodeRef(segment, segmentWithTimeSlice.getMinute(), minute, hour, day, second);
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
package org.skywalking.apm.collector.worker.segment;
import java.io.IOException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.skywalking.apm.collector.worker.config.EsConfig;
import org.skywalking.apm.collector.worker.storage.AbstractIndex;
import java.io.IOException;
/**
* @author pengys5
*/
public class SegmentIndex extends AbstractIndex {
public static final String INDEX = "segment_idx";
public static final String TRACE_SEGMENT_ID = "traceSegmentId";
public static final String SEGMENT_OBJ_BLOB = "segmentObjBlob";
@Override
public String index() {
......@@ -34,31 +35,20 @@ public class SegmentIndex extends AbstractIndex {
return XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("traceSegmentId")
.field("type", "keyword")
.endObject()
.startObject("startTime")
.field("type", "date")
.field("index", "not_analyzed")
.endObject()
.startObject("endTime")
.field("type", "date")
.field("index", "not_analyzed")
.endObject()
.startObject("applicationCode")
.startObject(TRACE_SEGMENT_ID)
.field("type", "keyword")
.endObject()
.startObject("minute")
.startObject(TYPE_MINUTE)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject("hour")
.startObject(TYPE_HOUR)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject("day")
.startObject(TYPE_DAY)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(SEGMENT_OBJ_BLOB)
.field("type", "binary")
.endObject()
.endObject()
.endObject();
......
package org.skywalking.apm.collector.worker.segment;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.IOException;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import org.apache.commons.codec.binary.Base64;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
......@@ -14,8 +16,7 @@ import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
import org.skywalking.apm.collector.worker.httpserver.AbstractStreamPost;
import org.skywalking.apm.collector.worker.httpserver.AbstractStreamPostProvider;
import org.skywalking.apm.collector.worker.grpcserver.AbstractReceiver;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.node.analysis.NodeCompAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeMappingDayAnalysis;
......@@ -27,20 +28,21 @@ import org.skywalking.apm.collector.worker.noderef.analysis.NodeRefMinuteAnalysi
import org.skywalking.apm.collector.worker.segment.analysis.SegmentAnalysis;
import org.skywalking.apm.collector.worker.segment.analysis.SegmentCostAnalysis;
import org.skywalking.apm.collector.worker.segment.analysis.SegmentExceptionAnalysis;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndBase64;
import org.skywalking.apm.collector.worker.storage.AbstractTimeSlice;
import org.skywalking.apm.collector.worker.tools.DateTools;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.skywalking.apm.util.StringUtil;
/**
* @author pengys5
*/
public class SegmentPost extends AbstractStreamPost {
private static final Logger logger = LogManager.getFormatterLogger(SegmentPost.class);
public class SegmentReceiver extends AbstractReceiver {
private static final Logger logger = LogManager.getFormatterLogger(SegmentReceiver.class);
public SegmentPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public SegmentReceiver(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -67,59 +69,47 @@ public class SegmentPost extends AbstractStreamPost {
* Read segment's buffer from buffer reader by stream mode. when finish read one segment then send to analysis.
* This method in there, so post servlet just can receive segments data.
*/
@Override protected void onReceive(BufferedReader bufferedReader,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
Segment segment;
try {
do {
int character;
StringBuilder builder = new StringBuilder();
while ((character = bufferedReader.read()) != ' ') {
if (character == -1) {
return;
}
builder.append((char)character);
}
int length = Integer.valueOf(builder.toString());
builder = new StringBuilder();
char[] buffer = new char[length];
int readLength = bufferedReader.read(buffer, 0, length);
if (readLength != length) {
logger.error("The actual data length was different from the length in data head! ");
return;
}
builder.append(buffer);
String segmentJsonStr = builder.toString();
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentJsonStr);
tellWorkers(new SegmentAndJson(segment, segmentJsonStr));
@Override protected void onReceive(
Object request) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (request instanceof UpstreamSegment) {
UpstreamSegment upstreamSegment = (UpstreamSegment)request;
ByteString segmentByte = upstreamSegment.getSegment();
List<String> globalTraceIds = upstreamSegment.getGlobalTraceIdsList();
String segmentBase64 = new String(Base64.encodeBase64(segmentByte.toByteArray()));
TraceSegmentObject segment;
try {
segment = TraceSegmentObject.parseFrom(segmentByte);
} catch (InvalidProtocolBufferException e) {
throw new ArgumentsParseException(e.getMessage(), e);
}
while (segment != null);
} catch (IOException e) {
throw new ArgumentsParseException(e.getMessage(), e);
tellWorkers(new SegmentAndBase64(segment, segmentBase64), globalTraceIds);
}
}
private void tellWorkers(SegmentAndJson segmentAndJson) throws WorkerNotFoundException, WorkerInvokeException {
Segment segment = segmentAndJson.getSegment();
private void tellWorkers(
SegmentAndBase64 segmentAndBase64,
List<String> globalTraceIds) throws WorkerNotFoundException, WorkerInvokeException {
TraceSegmentObject segment = segmentAndBase64.getObject();
try {
validateData(segment);
} catch (IllegalArgumentException e) {
} catch (ArgumentsParseException e) {
logger.error(e.getMessage(), e);
return;
}
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", segment.getTraceSegmentId());
SpanObject firstSpan = segment.getSpans(segment.getSpansCount() - 1);
long minuteSlice = DateTools.getMinuteSlice(segment.getStartTime());
long hourSlice = DateTools.getHourSlice(segment.getStartTime());
long daySlice = DateTools.getDaySlice(segment.getStartTime());
int second = DateTools.getSecond(segment.getStartTime());
long minuteSlice = DateTools.getMinuteSlice(firstSpan.getStartTime());
long hourSlice = DateTools.getHourSlice(firstSpan.getStartTime());
long daySlice = DateTools.getDaySlice(firstSpan.getStartTime());
int second = DateTools.getSecond(firstSpan.getStartTime());
logger.debug("minuteSlice: %s, hourSlice: %s, daySlice: %s, second:%s", minuteSlice, hourSlice, daySlice, second);
SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
getSelfContext().lookup(SegmentAnalysis.Role.INSTANCE).tell(segmentAndJson);
SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(segment, globalTraceIds, minuteSlice, hourSlice, daySlice, second);
getSelfContext().lookup(SegmentAnalysis.Role.INSTANCE).tell(segmentAndBase64);
getSelfContext().lookup(SegmentCostAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(GlobalTraceAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
......@@ -145,29 +135,31 @@ public class SegmentPost extends AbstractStreamPost {
getSelfContext().lookup(NodeMappingDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
}
private void validateData(Segment segment) {
private void validateData(TraceSegmentObject segment) throws ArgumentsParseException {
if (StringUtil.isEmpty(segment.getTraceSegmentId())) {
throw new IllegalArgumentException("traceSegmentId required");
throw new ArgumentsParseException("traceSegmentId required");
}
if (0 == segment.getStartTime()) {
throw new IllegalArgumentException("startTime required");
if (segment.getSpansCount() < 1) {
throw new ArgumentsParseException("must contain at least one span");
}
}
public static class Factory extends AbstractStreamPostProvider<SegmentPost> {
@Override
public String servletPath() {
return "/segments";
SpanObject firstSpan = segment.getSpans(segment.getSpansCount() - 1);
if (firstSpan.getSpanId() != 0 && firstSpan.getParentSpanId() != -1) {
throw new ArgumentsParseException("first span id must equals 0 and parent span id must equals -1");
}
if (0 == firstSpan.getStartTime()) {
throw new ArgumentsParseException("startTime required");
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<SegmentReceiver> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public SegmentPost workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentPost(role(), clusterContext, new LocalWorkerContext());
public SegmentReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentReceiver(role(), clusterContext, new LocalWorkerContext());
}
}
......@@ -176,7 +168,7 @@ public class SegmentPost extends AbstractStreamPost {
@Override
public String roleName() {
return SegmentPost.class.getSimpleName();
return SegmentReceiver.class.getSimpleName();
}
@Override
......@@ -186,15 +178,23 @@ public class SegmentPost extends AbstractStreamPost {
}
public static class SegmentWithTimeSlice extends AbstractTimeSlice {
private final Segment segment;
private final TraceSegmentObject segment;
private final List<String> globalTraceIds;
public SegmentWithTimeSlice(Segment segment, long minute, long hour, long day, int second) {
public SegmentWithTimeSlice(TraceSegmentObject segment, List<String> globalTraceIds, long minute, long hour,
long day, int second) {
super(minute, hour, day, second);
this.segment = segment;
this.globalTraceIds = globalTraceIds;
}
public Segment getSegment() {
public TraceSegmentObject getSegment() {
return segment;
}
public List<String> getGlobalTraceIds() {
return globalTraceIds;
}
}
}
package org.skywalking.apm.collector.worker.segment;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -30,13 +31,17 @@ public class SegmentTopGet extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(SegmentTopSearch.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("from") || !parameter.containsKey("limit")) {
throw new ArgumentsParseException("the request parameter must contains startTime, endTime, from, limit");
}
......
......@@ -13,7 +13,7 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndBase64;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentSave;
/**
......@@ -34,11 +34,11 @@ public class SegmentAnalysis extends RecordAnalysisMember {
@Override
public void analyse(Object message) {
if (message instanceof SegmentAndJson) {
SegmentAndJson segmentAndJson = (SegmentAndJson)message;
if (message instanceof SegmentAndBase64) {
SegmentAndBase64 segmentAndBase64 = (SegmentAndBase64)message;
try {
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(segmentAndJson);
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(segmentAndBase64);
} catch (WorkerInvokeException | WorkerNotFoundException e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
......
......@@ -14,11 +14,11 @@ import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.segment.SegmentCostIndex;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentCostSave;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -38,36 +38,37 @@ public class SegmentCostAnalysis extends RecordAnalysisMember {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
if (CollectionTools.isNotEmpty(segment.getSpans())) {
for (Span span : segment.getSpans()) {
if (CollectionTools.isNotEmpty(segment.getSpansList())) {
for (SpanObject span : segment.getSpansList()) {
if (span.getParentSpanId() == -1) {
JsonObject dataJsonObj = new JsonObject();
dataJsonObj.addProperty(SegmentCostIndex.SEG_ID, segment.getTraceSegmentId());
dataJsonObj.addProperty(SegmentCostIndex.START_TIME, span.getStartTime());
dataJsonObj.addProperty(SegmentCostIndex.END_TIME, span.getEndTime());
if (segment.getRelatedGlobalTraces().get() != null && segment.getRelatedGlobalTraces().get().size() > 0) {
dataJsonObj.addProperty(SegmentCostIndex.GLOBAL_TRACE_ID, segment.getRelatedGlobalTraces().get().get(0));
}
dataJsonObj.addProperty(SegmentCostIndex.OPERATION_NAME, span.getOperationName());
dataJsonObj.addProperty(SegmentCostIndex.TIME_SLICE, segmentWithTimeSlice.getMinute());
for (String globalTraceId : segmentWithTimeSlice.getGlobalTraceIds()) {
segment.getGlobalTraceIdsList();
JsonObject dataJsonObj = new JsonObject();
dataJsonObj.addProperty(SegmentCostIndex.SEG_ID, segment.getTraceSegmentId());
dataJsonObj.addProperty(SegmentCostIndex.START_TIME, span.getStartTime());
dataJsonObj.addProperty(SegmentCostIndex.END_TIME, span.getEndTime());
dataJsonObj.addProperty(SegmentCostIndex.GLOBAL_TRACE_ID, globalTraceId);
dataJsonObj.addProperty(SegmentCostIndex.OPERATION_NAME, span.getOperationName());
dataJsonObj.addProperty(SegmentCostIndex.TIME_SLICE, segmentWithTimeSlice.getMinute());
long startTime = span.getStartTime();
long endTime = span.getEndTime();
long cost = endTime - startTime;
if (cost == 0) {
cost = 1;
long startTime = span.getStartTime();
long endTime = span.getEndTime();
long cost = endTime - startTime;
if (cost == 0) {
cost = 1;
}
dataJsonObj.addProperty(SegmentCostIndex.COST, cost);
set(segment.getTraceSegmentId(), dataJsonObj);
}
dataJsonObj.addProperty(SegmentCostIndex.COST, cost);
set(segment.getTraceSegmentId(), dataJsonObj);
}
}
}
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
......@@ -16,13 +16,12 @@ import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.segment.SegmentExceptionIndex;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.LogData;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.segment.entity.tag.Tags;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionSave;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.network.proto.LogMessage;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -42,13 +41,13 @@ public class SegmentExceptionAnalysis extends RecordAnalysisMember {
@Override
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (message instanceof SegmentReceiver.SegmentWithTimeSlice) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)message;
TraceSegmentObject segment = segmentWithTimeSlice.getSegment();
if (CollectionTools.isNotEmpty(segment.getSpans())) {
for (Span span : segment.getSpans()) {
boolean isError = Tags.ERROR.get(span);
if (CollectionTools.isNotEmpty(segment.getSpansList())) {
for (SpanObject span : segment.getSpansList()) {
boolean isError = span.getIsError();
JsonObject dataJsonObj = new JsonObject();
dataJsonObj.addProperty(SegmentExceptionIndex.IS_ERROR, isError);
......@@ -56,11 +55,11 @@ public class SegmentExceptionAnalysis extends RecordAnalysisMember {
JsonArray errorKind = new JsonArray();
if (isError) {
List<LogData> logDataList = span.getLogs();
for (LogData logData : logDataList) {
if (logData.getFields().containsKey("error.kind")) {
errorKind.add(String.valueOf(logData.getFields().get("error.kind")));
}
List<LogMessage> logMessages = span.getLogsList();
for (LogMessage logMessage : logMessages) {
// if (logMessage.getFields().containsKey("error.kind")) {
// errorKind.add(String.valueOf(logData.getFields().get("error.kind")));
// }
}
}
dataJsonObj.add(SegmentExceptionIndex.ERROR_KIND, errorKind);
......@@ -68,7 +67,7 @@ public class SegmentExceptionAnalysis extends RecordAnalysisMember {
}
}
} else {
logger.error("unhandled message, message instance must SegmentPost.SegmentWithTimeSlice, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must SegmentReceiver.SegmentWithTimeSlice, but is %s", message.getClass().toString());
}
}
......
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
/**
* @author pengys5
*/
@JsonAdapter(GlobalTraceId.Serializer.class)
public class GlobalTraceId {
public GlobalTraceId() {
globalTraceIds = new LinkedList<>();
}
private LinkedList<String> globalTraceIds;
public LinkedList<String> get() {
return globalTraceIds;
}
public static class Serializer extends TypeAdapter<GlobalTraceId> {
@Override public void write(JsonWriter out, GlobalTraceId value) throws IOException {
List<String> globalTraceIds = value.globalTraceIds;
if (globalTraceIds.size() > 0) {
out.beginArray();
for (String globalTraceId : globalTraceIds) {
out.value(globalTraceId);
}
out.endArray();
}
}
@Override public GlobalTraceId read(JsonReader in) throws IOException {
GlobalTraceId globalTraceId = new GlobalTraceId();
in.beginArray();
try {
while (in.hasNext()) {
globalTraceId.get().add(in.nextString());
}
} finally {
in.endArray();
}
return globalTraceId;
}
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.annotations.SerializedName;
import java.util.Map;
/**
* @author pengys5
*/
public class LogData {
@SerializedName("tm")
private long time;
@SerializedName("fi")
private Map<String, String> fields;
public long getTime() {
return time;
}
public Map<String, String> getFields() {
return fields;
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.annotations.SerializedName;
import java.util.List;
/**
* @author pengys5
*/
public class Segment {
@SerializedName("ts")
private String traceSegmentId;
@SerializedName("st")
private long startTime;
@SerializedName("et")
private long endTime;
@SerializedName("rs")
private List<TraceSegmentRef> refs;
@SerializedName("ss")
private List<Span> spans;
@SerializedName("ac")
private String applicationCode;
@SerializedName("gt")
private GlobalTraceId relatedGlobalTraces;
public String getTraceSegmentId() {
return traceSegmentId;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public String getApplicationCode() {
return applicationCode;
}
public List<TraceSegmentRef> getRefs() {
return refs;
}
public List<Span> getSpans() {
return spans;
}
public GlobalTraceId getRelatedGlobalTraces() {
return relatedGlobalTraces;
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
*/
public class SegmentAndBase64 {
private final TraceSegmentObject object;
private final String base64;
public SegmentAndBase64(TraceSegmentObject object, String base64) {
this.object = object;
this.base64 = base64;
}
public TraceSegmentObject getObject() {
return object;
}
public String getBase64() {
return base64;
}
public String getSegmentJsonStr() {
JsonObject segmentJson = new JsonObject();
segmentJson.addProperty(SegmentIndex.TRACE_SEGMENT_ID, object.getTraceSegmentId());
segmentJson.addProperty(SegmentIndex.SEGMENT_OBJ_BLOB, base64);
return segmentJson.toString();
}
}
package org.skywalking.apm.collector.worker.segment.entity;
/**
* @author pengys5
*/
public class SegmentAndJson {
private final Segment segment;
private final String jsonStr;
public SegmentAndJson(Segment segment, String jsonStr) {
this.segment = segment;
this.jsonStr = jsonStr;
}
public Segment getSegment() {
return segment;
}
public String getJsonStr() {
return jsonStr;
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.Gson;
import com.sun.org.apache.xml.internal.security.utils.Base64;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* The <code>SegmentDeserialize</code> provides single segment json string deserialize and segment array file
......@@ -13,17 +16,21 @@ import java.io.IOException;
public enum SegmentDeserialize {
INSTANCE;
private final Gson gson = new Gson();
private final Logger logger = LogManager.getFormatterLogger(SegmentDeserialize.class);
/**
* Single segment json string deserialize.
* Segment object binary value as a base64 encoded string deserialize.
*
* @param singleSegmentJsonStr a segment json string
* @return an {@link Segment}
* @throws IOException if json string illegal or file broken.
* @param segmentObjBlob , to be a binary value as a base64 encoded string
* @return an {@link TraceSegmentObject}
*/
public Segment deserializeSingle(String singleSegmentJsonStr) throws IOException {
Segment segment = gson.fromJson(singleSegmentJsonStr, Segment.class);
return segment;
public TraceSegmentObject deserializeSingle(String segmentObjBlob) {
try {
byte[] decode = Base64.decode(segmentObjBlob);
return TraceSegmentObject.parseFrom(decode);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.annotations.SerializedName;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class Span {
@SerializedName("si")
private int spanId;
@SerializedName("ps")
private int parentSpanId;
@SerializedName("st")
private long startTime;
@SerializedName("et")
private long endTime;
@SerializedName("on")
private String operationName;
@SerializedName("ts")
private Map<String, String> tagsWithStr;
@SerializedName("tb")
private Map<String, Boolean> tagsWithBool;
@SerializedName("ti")
private Map<String, Integer> tagsWithInt;
@SerializedName("lo")
private List<LogData> logs;
public int getSpanId() {
return spanId;
}
public int getParentSpanId() {
return parentSpanId;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public String getOperationName() {
return operationName;
}
public String getStrTag(String key) {
return tagsWithStr.get(key);
}
public Boolean getBoolTag(String key) {
return tagsWithBool.get(key);
}
public Integer getIntTag(String key) {
return tagsWithInt.get(key);
}
public List<LogData> getLogs() {
return logs;
}
}
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.annotations.SerializedName;
/**
* @author pengys5
*/
public class TraceSegmentRef {
@SerializedName("ts")
private String traceSegmentId;
@SerializedName("si")
private int spanId = -1;
@SerializedName("ac")
private String applicationCode;
@SerializedName("ph")
private String peerHost;
public String getTraceSegmentId() {
return traceSegmentId;
}
public int getSpanId() {
return spanId;
}
public String getApplicationCode() {
return applicationCode;
}
public String getPeerHost() {
return peerHost;
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.skywalking.apm.collector.worker.segment.entity.Span;
public abstract class AbstractTag<T> {
/**
* The key of this Tag.
*/
protected final String key;
public AbstractTag(String tagKey) {
this.key = tagKey;
}
public abstract T get(Span span);
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.skywalking.apm.collector.worker.segment.entity.Span;
/**
* Do the same thing as {@link StringTag}, just with a {@link Boolean} value.
* <p>
* Created by wusheng on 2017/2/17.
*/
public class BooleanTag extends AbstractTag<Boolean> {
private boolean defaultValue;
public BooleanTag(String key, boolean defaultValue) {
super(key);
this.defaultValue = defaultValue;
}
/**
* Get a tag value, type of {@link Boolean}. After akka-message/serialize, all tags values are type of {@link
* String}, convert to {@link Boolean}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public Boolean get(Span span) {
Boolean tagValue = span.getBoolTag(super.key);
if (tagValue == null) {
return defaultValue;
} else {
return tagValue;
}
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.skywalking.apm.collector.worker.segment.entity.Span;
/**
* Do the same thing as {@link StringTag}, just with a {@link Integer} value.
* <p>
* Created by wusheng on 2017/2/18.
*/
public class IntTag extends AbstractTag<Integer> {
public IntTag(String key) {
super(key);
}
/**
* Get a tag value, type of {@link Integer}.
* After akka-message/serialize, all tags values are type of {@link String}, convert to {@link Integer}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public Integer get(Span span) {
Integer tagValue = span.getIntTag(super.key);
if (tagValue == null) {
return null;
} else {
return tagValue;
}
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.skywalking.apm.collector.worker.segment.entity.Span;
/**
* Do the same thing as {@link StringTag}, just with a {@link Short} value.
* <p>
* Created by wusheng on 2017/2/17.
*/
public class ShortTag extends AbstractTag<Short> {
public ShortTag(String key) {
super(key);
}
/**
* Get a tag value, type of {@link Short}.
* After akka-message/serialize, all tags values are type of {@link String}, convert to {@link Short}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public Short get(Span span) {
Integer tagValue = span.getIntTag(super.key);
if (tagValue == null) {
return null;
} else {
return Short.valueOf(tagValue.toString());
}
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.skywalking.apm.collector.worker.segment.entity.Span;
/**
* A subclass of {@link AbstractTag},
* represent a tag with a {@link String} value.
* <p>
* Created by wusheng on 2017/2/17.
*/
public class StringTag extends AbstractTag<String> {
public StringTag(String tagKey) {
super(tagKey);
}
@Override
public String get(Span span) {
return span.getStrTag(super.key);
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.skywalking.apm.collector.worker.segment.entity.Span;
/**
* The span tags are supported by sky-walking engine.
* As default, all tags will be stored, but these ones have particular meanings.
* <p>
* Created by wusheng on 2017/2/17.
*/
public final class Tags {
private Tags() {
}
/**
* URL records the url of the incoming request.
*/
public static final StringTag URL = new StringTag("url");
/**
* STATUS_CODE records the http status code of the response.
*/
public static final IntTag STATUS_CODE = new IntTag("status_code");
/**
* SPAN_KIND hints at the relationship between spans, e.g. client/server.
*/
public static final StringTag SPAN_KIND = new StringTag("span.kind");
/**
* A constant for setting the span kind to indicate that it represents a server span.
*/
public static final String SPAN_KIND_SERVER = "server";
/**
* A constant for setting the span kind to indicate that it represents a client span.
*/
public static final String SPAN_KIND_CLIENT = "client";
/**
* SPAN_LAYER represents the kind of span.
* <p>
* e.g.
* db=database;
* rpc=Remote Procedure Call Framework, like motan, thift;
* nosql=something like redis/memcache
*/
public static final class SPAN_LAYER {
private static StringTag SPAN_LAYER_TAG = new StringTag("span.layer");
public static String get(Span span) {
return SPAN_LAYER_TAG.get(span);
}
}
/**
* COMPONENT is a low-cardinality identifier of the module, library, or package that is instrumented.
* Like dubbo/dubbox/motan
*/
public static final StringTag COMPONENT = new StringTag("component");
/**
* ERROR indicates whether a Span ended in an error state.
*/
public static final BooleanTag ERROR = new BooleanTag("error", false);
/**
* PEER_HOST records host address (ip:port, or ip1:port1,ip2:port2) of the peer, maybe IPV4, IPV6 or hostname.
*/
public static final StringTag PEER_HOST = new StringTag("peer.host");
/**
* PEER_PORT records remote port of the peer
*/
public static final IntTag PEER_PORT = new IntTag("peer.port");
/**
* PEERS records multiple host address and port of remote
*/
public static final StringTag PEERS = new StringTag("peers");
/**
* DB_TYPE records database type, such as sql, redis, cassandra and so on.
*/
public static final StringTag DB_TYPE = new StringTag("db.type");
/**
* DB_INSTANCE records database instance name.
*/
public static final StringTag DB_INSTANCE = new StringTag("db.instance");
/**
* DB_STATEMENT records the sql statement of the database access.
*/
public static final StringTag DB_STATEMENT = new StringTag("db.statement");
}
package org.skywalking.apm.collector.worker.segment.persistence;
import com.google.gson.JsonObject;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
......@@ -13,7 +14,7 @@ import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.PersistenceMember;
import org.skywalking.apm.collector.worker.config.CacheSizeConfig;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndBase64;
import org.skywalking.apm.collector.worker.storage.AbstractIndex;
import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.PersistenceWorkerListener;
......@@ -46,11 +47,11 @@ public class SegmentSave extends PersistenceMember<SegmentPersistenceData, Segme
}
@Override final public void analyse(Object message) {
if (message instanceof SegmentAndJson) {
SegmentAndJson segmentAndJson = (SegmentAndJson)message;
if (message instanceof SegmentAndBase64) {
SegmentAndBase64 segmentAndBase64 = (SegmentAndBase64)message;
SegmentPersistenceData data = getPersistenceData();
data.hold();
data.getOrCreate(segmentAndJson.getSegment().getTraceSegmentId()).setSegmentStr(segmentAndJson.getJsonStr());
data.getOrCreate(segmentAndBase64.getObject().getTraceSegmentId()).setSegmentStr(segmentAndBase64.getSegmentJsonStr());
if (data.size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
persistence(data.asMap());
}
......
......@@ -2,8 +2,8 @@ package org.skywalking.apm.collector.worker.segment.persistence;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
......@@ -25,10 +25,10 @@ import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.segment.SegmentCostIndex;
import org.skywalking.apm.collector.worker.segment.SegmentExceptionIndex;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.util.StringUtil;
/**
......@@ -102,15 +102,11 @@ public class SegmentTopSearch extends AbstractLocalSyncWorker {
topSegmentJson.addProperty(SegmentCostIndex.OPERATION_NAME, (String)searchHit.getSource().get(SegmentCostIndex.OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostIndex.COST, (Number)searchHit.getSource().get(SegmentCostIndex.COST));
String segmentSource = EsClient.INSTANCE.getClient().prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
logger().debug("segmentSource:" + segmentSource);
Segment segment;
try {
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);
}
List<String> distributedTraceIdList = segment.getRelatedGlobalTraces().get();
GetResponse getResponse = EsClient.INSTANCE.getClient().prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get();
String segmentObjBlob = (String)getResponse.getSource().get(SegmentIndex.SEGMENT_OBJ_BLOB);
TraceSegmentObject segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentObjBlob);
List<String> distributedTraceIdList = segment.getGlobalTraceIdsList();
JsonArray distributedTraceIdArray = new JsonArray();
if (CollectionTools.isNotEmpty(distributedTraceIdList)) {
......
package org.skywalking.apm.collector.worker.span;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -30,13 +31,17 @@ public class SpanGetWithId extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(SpanSearchWithId.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("segId") || !parameter.containsKey("spanId")) {
throw new ArgumentsParseException("the request parameter must contains segId, spanId");
}
......
......@@ -2,7 +2,6 @@ package org.skywalking.apm.collector.worker.span.persistence;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.action.get.GetResponse;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
......@@ -15,10 +14,10 @@ import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.storage.GetResponseFromEs;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -36,18 +35,15 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity)request;
GetResponse getResponse = GetResponseFromEs.INSTANCE.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, search.segId);
Segment segment;
try {
segment = SegmentDeserialize.INSTANCE.deserializeSingle(getResponse.getSourceAsString());
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);
}
List<Span> spanList = segment.getSpans();
String segmentObjBlob = (String)getResponse.getSource().get(SegmentIndex.SEGMENT_OBJ_BLOB);
TraceSegmentObject segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentObjBlob);
List<SpanObject> spanList = segment.getSpansList();
getResponse.getSource();
JsonObject dataJson = new JsonObject();
for (Span span : spanList) {
for (SpanObject span : spanList) {
if (String.valueOf(span.getSpanId()).equals(search.spanId)) {
String spanJsonStr = gson.toJson(span);
dataJson = gson.fromJson(spanJsonStr, JsonObject.class);
......
package org.skywalking.apm.collector.worker.tools;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.segment.entity.tag.Tags;
import java.util.List;
/**
* @author pengys5
*/
public class ClientSpanIsLeafTools {
private static final Logger logger = LogManager.getFormatterLogger(ClientSpanIsLeafTools.class);
public static boolean isLeaf(int spanId, List<Span> spanList) {
boolean isLeaf = true;
for (Span span : spanList) {
if (span.getParentSpanId() == spanId && Tags.SPAN_KIND_CLIENT.equals(Tags.SPAN_KIND.get(span))) {
logger.debug("current spanId=%s, merge spanId=%s, span kind=%s", spanId, span.getSpanId(), Tags.SPAN_KIND.get(span));
isLeaf = false;
}
}
return isLeaf;
}
}
package org.skywalking.apm.collector.worker.tools;
import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.segment.entity.tag.Tags;
import org.skywalking.apm.network.proto.SpanObject;
/**
* @author pengys5
......@@ -11,13 +8,11 @@ import org.skywalking.apm.collector.worker.segment.entity.tag.Tags;
public enum SpanPeersTools {
INSTANCE;
public String getPeers(Span span) {
if (StringUtil.isEmpty(Tags.PEERS.get(span))) {
String host = Tags.PEER_HOST.get(span);
int port = Tags.PEER_PORT.get(span);
return Const.PEERS_FRONT_SPLIT + host + ":" + port + Const.PEERS_BEHIND_SPLIT;
public int getPeers(SpanObject span) {
if (span.getPeerId() == 0) {
return 0; //TODO exchange peer to peer id
} else {
return Const.PEERS_FRONT_SPLIT + Tags.PEERS.get(span) + Const.PEERS_BEHIND_SPLIT;
return span.getPeerId();
}
}
}
package org.skywalking.apm.collector.worker.tracedag;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -34,6 +35,10 @@ public class TraceDagGetWithTimeSlice extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeCompLoad.WorkerRole.INSTANCE).create(this);
......@@ -43,7 +48,7 @@ public class TraceDagGetWithTimeSlice extends AbstractGet {
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("timeSliceType")) {
throw new ArgumentsParseException("the request parameter must contains startTime,endTime,timeSliceType");
}
......@@ -87,7 +92,7 @@ public class TraceDagGetWithTimeSlice extends AbstractGet {
JsonObject result = getBuilder().build(compResponse.get(Const.RESULT).getAsJsonArray(), nodeMappingResponse.get(Const.RESULT).getAsJsonArray(),
nodeRefResponse.get(Const.RESULT).getAsJsonArray(), resSumResponse.get(Const.RESULT).getAsJsonArray());
response.add(Const.RESULT, result);
((JsonObject)response).add(Const.RESULT, result);
}
private JsonObject getNewResponse() {
......
org.skywalking.apm.collector.worker.grpcserver.TraceSegmentServiceImpl
\ No newline at end of file
org.skywalking.apm.collector.worker.grpcserver.GRPCAddressRegister$Factory
org.skywalking.apm.collector.worker.globaltrace.persistence.GlobalTraceAgg$Factory
org.skywalking.apm.collector.worker.noderef.persistence.NodeRefDayAgg$Factory
......
......@@ -2,6 +2,7 @@ org.skywalking.apm.collector.worker.segment.analysis.SegmentAnalysis$Factory
org.skywalking.apm.collector.worker.segment.analysis.SegmentCostAnalysis$Factory
org.skywalking.apm.collector.worker.segment.analysis.SegmentExceptionAnalysis$Factory
org.skywalking.apm.collector.worker.segment.SegmentReceiver$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentSave$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentCostSave$Factory
org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionSave$Factory
......
org.skywalking.apm.collector.cluster.ClusterConfigProvider
org.skywalking.apm.collector.worker.config.EsConfigProvider
org.skywalking.apm.collector.worker.config.HttpConfigProvider
org.skywalking.apm.collector.worker.config.GRPCConfigProvider
org.skywalking.apm.collector.worker.config.CacheSizeConfigProvider
org.skywalking.apm.collector.worker.config.WorkerConfigProvider
\ No newline at end of file
......@@ -2,4 +2,6 @@ org.skywalking.apm.collector.worker.noderef.NodeRefResSumGetGroupWithTimeSlice$F
org.skywalking.apm.collector.worker.segment.SegmentTopGet$Factory
org.skywalking.apm.collector.worker.globaltrace.GlobalTraceGetWithGlobalId$Factory
org.skywalking.apm.collector.worker.span.SpanGetWithId$Factory
org.skywalking.apm.collector.worker.tracedag.TraceDagGetWithTimeSlice$Factory
\ No newline at end of file
org.skywalking.apm.collector.worker.tracedag.TraceDagGetWithTimeSlice$Factory
org.skywalking.apm.collector.worker.grpcserver.GRPCAddressGet$Factory
\ No newline at end of file
......@@ -6,7 +6,7 @@ cluster.current.port = 11800
# In this version, all members have same roles, and everyone of them is listening the status of others.
#The routers do not send message to nodes, which is unreachable, caused by network trouble, jvm crash or any other reasons.
cluster.current.roles=WorkersListener
cluster.current.roles=WorkersListener,RPCAddressListener
#Initial contact points of the cluster, e.g. seed_nodes = 127.0.0.1:11800, 127.0.0.1:11801.
#The nodes to join automatically at startup.
......@@ -27,7 +27,7 @@ es.cluster.nodes=127.0.0.1:9300
#auto: create index when it doesn't exist.
# forced: delete and create.
# manual: do nothing.
es.index.initialize.mode=auto
es.index.initialize.mode=forced
# Config of shards or replicas in Elasticsearch.
es.index.shards.number=2
es.index.replicas.number=0
......@@ -39,6 +39,10 @@ http.port=12800
# Web context path
http.contextPath=/
# GRPC services
grpc.hostname=127.0.0.1
grpc.port=22800
# Cache size of analysis worker. The value determines whether sending to next worker and clear, or not.
cache.analysis.size=1024
# Cache size of persistence worker. The value determines whether save data and clear, or not.
......
......@@ -14,16 +14,19 @@
</Policies>
<DefaultRolloverStrategy max="30"/>
</RollingFile>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<logger name="org.skywalking.apm.collector" level="debug">
<AppenderRef ref="RollingFile"/>
<logger name="org.skywalking.apm.collector" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</logger>
<logger name="org.skywalking.apm.collector.worker.storage" level="error">
<AppenderRef ref="RollingFile"/>
<logger name="org.skywalking.apm.collector.worker.storage" level="error" additivity="false">
<AppenderRef ref="Console"/>
</logger>
<Root level="INFO">
<AppenderRef ref="RollingFile"/>
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
package org.skywalking.apm.collector.worker.httpserver;
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.io.StringReader;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author pengys5
*/
public class PostWithHttpServletTestCase {
private LocalSyncWorkerRef workerRef;
private AbstractPost.PostWithHttpServlet servlet;
private HttpServletRequest request;
private HttpServletResponse response;
private PrintWriter writer;
@Before
public void init() throws Exception {
workerRef = mock(LocalSyncWorkerRef.class);
servlet = new AbstractPost.PostWithHttpServlet(workerRef);
request = mock(HttpServletRequest.class);
response = mock(HttpServletResponse.class);
writer = mock(PrintWriter.class);
when(response.getWriter()).thenReturn(writer);
}
@Test
public void testDoPost() throws Exception {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Integer status = (Integer)invocation.getArguments()[0];
Assert.assertEquals(new Integer(200), status);
return null;
}
}).when(response).setStatus(anyInt());
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Segment segment = (Segment)invocation.getArguments()[0];
Assert.assertEquals("TestTest2", segment.getTraceSegmentId());
return null;
}
}).when(workerRef).tell(any(Segment.class));
BufferedReader bufferedReader = new BufferedReader(new StringReader("[{\"ts\":\"TestTest2\"}]"));
when(request.getReader()).thenReturn(bufferedReader);
servlet.doPost(request, response);
}
@Test
public void testDoPostError() throws Exception {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Integer status = (Integer)invocation.getArguments()[0];
Assert.assertEquals(new Integer(500), status);
return null;
}
}).when(response).setStatus(anyInt());
doThrow(new WorkerInvokeException("")).when(workerRef).tell(anyString());
servlet.doPost(request, response);
}
}
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Map;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -19,13 +20,17 @@ public class TestAbstractGet extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
}
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Map;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -19,13 +20,17 @@ public class TestAbstractPost extends AbstractPost {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
}
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -19,6 +20,10 @@ public class TestAbstractStreamPost extends AbstractStreamPost {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
......
package org.skywalking.apm.collector.worker.segment;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.worker.segment.mock.SegmentMock;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author pengys5
*/
public class SegmentRealPost {
private static Logger logger = LogManager.getFormatterLogger(SegmentRealPost.class);
public static void main(String[] args) throws Exception {
SegmentMock mock = new SegmentMock();
// String cacheServiceExceptionSegmentAsString = mock.mockCacheServiceExceptionSegmentAsString();
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", cacheServiceExceptionSegmentAsString);
//
// String portalServiceExceptionSegmentAsString = mock.mockPortalServiceExceptionSegmentAsString();
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", portalServiceExceptionSegmentAsString);
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 22800)
.usePlaintext(true)
.build();
TraceSegmentServiceGrpc.TraceSegmentServiceStub stub = TraceSegmentServiceGrpc.newStub(channel);
StreamObserver<UpstreamSegment> observer = stub.collect(new StreamObserver<Downstream>() {
@Override public void onNext(Downstream downstream) {
}
@Override public void onError(Throwable throwable) {
String cacheServiceSegmentAsString = mock.mockCacheServiceSegmentAsString();
System.out.println(cacheServiceSegmentAsString);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", cacheServiceSegmentAsString);
}
String persistenceServiceSegmentAsString = mock.mockPersistenceServiceSegmentAsString();
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", persistenceServiceSegmentAsString);
@Override public void onCompleted() {
String portalServiceSegmentAsString = mock.mockPortalServiceSegmentAsString();
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", portalServiceSegmentAsString);
}
});
// String specialSegmentAsString = mock.mockSpecialSegmentAsString();
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", specialSegmentAsString);
List<UpstreamSegment> upstreamSegmentList = SegmentMock.mockPortalServiceSegment();
logger.debug("upstreamSegmentList size: %s", upstreamSegmentList.size());
upstreamSegmentList.forEach(upstreamSegment -> {
observer.onNext(upstreamSegment);
});
observer.onCompleted();
Thread.sleep(2000);
}
}
......@@ -58,12 +58,12 @@ import static org.powermock.api.mockito.PowerMockito.mock;
@RunWith(PowerMockRunner.class)
@PrepareForTest({LocalWorkerContext.class, WorkerRef.class})
@PowerMockIgnore({"javax.management.*"})
public class SegmentPostTestCase {
public class SegmentReceiverTestCase {
private Logger logger = LogManager.getFormatterLogger(SegmentPostTestCase.class);
private Logger logger = LogManager.getFormatterLogger(SegmentReceiverTestCase.class);
private SegmentMock segmentMock;
private SegmentPost segmentPost;
private SegmentReceiver segmentReceiver;
private LocalWorkerContext localWorkerContext;
private ClusterWorkerContext clusterWorkerContext;
......@@ -76,7 +76,7 @@ public class SegmentPostTestCase {
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
localWorkerContext = new LocalWorkerContext();
segmentPost = new SegmentPost(SegmentPost.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
segmentReceiver = new SegmentReceiver(SegmentReceiver.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext);
initNodeNodeMappingAnalysis();
initNodeCompAnalysis();
......@@ -89,16 +89,15 @@ public class SegmentPostTestCase {
@Test
public void testRole() {
Assert.assertEquals(SegmentPost.class.getSimpleName(), SegmentPost.WorkerRole.INSTANCE.roleName());
Assert.assertEquals(RollingSelector.class.getSimpleName(), SegmentPost.WorkerRole.INSTANCE.workerSelector().getClass().getSimpleName());
Assert.assertEquals(SegmentReceiver.class.getSimpleName(), SegmentReceiver.WorkerRole.INSTANCE.roleName());
Assert.assertEquals(RollingSelector.class.getSimpleName(), SegmentReceiver.WorkerRole.INSTANCE.workerSelector().getClass().getSimpleName());
}
@Test
public void testFactory() {
SegmentPost.Factory factory = new SegmentPost.Factory();
Assert.assertEquals(SegmentPost.class.getSimpleName(), factory.role().roleName());
Assert.assertEquals(SegmentPost.class.getSimpleName(), factory.workerInstance(null).getClass().getSimpleName());
Assert.assertEquals("/segments", factory.servletPath());
SegmentReceiver.Factory factory = new SegmentReceiver.Factory();
Assert.assertEquals(SegmentReceiver.class.getSimpleName(), factory.role().roleName());
Assert.assertEquals(SegmentReceiver.class.getSimpleName(), factory.workerInstance(null).getClass().getSimpleName());
}
@Test
......@@ -141,7 +140,7 @@ public class SegmentPostTestCase {
ArgumentCaptor<Role> argumentCaptor = ArgumentCaptor.forClass(Role.class);
segmentPost.preStart();
segmentReceiver.preStart();
verify(clusterWorkerContext, times(17)).findProvider(argumentCaptor.capture());
Assert.assertEquals(GlobalTraceAnalysis.Role.INSTANCE.roleName(), argumentCaptor.getAllValues().get(0).roleName());
......@@ -178,7 +177,7 @@ public class SegmentPostTestCase {
JsonObject response = new JsonObject();
BufferedReader reader = new BufferedReader(new StringReader(jsonStr.length() + " " + jsonStr));
segmentPost.onReceive(reader, response);
// segmentReceiver.onReceive(reader, response);
}
private SegmentSaveAnswer segmentSaveAnswer_1;
......@@ -304,7 +303,7 @@ public class SegmentPostTestCase {
public void testOnReceive() throws Exception {
String cacheServiceSegmentAsString = segmentMock.mockCacheServiceSegmentAsString();
segmentPost.onReceive(new BufferedReader(new StringReader(cacheServiceSegmentAsString)), new JsonObject());
// segmentReceiver.onReceive(new BufferedReader(new StringReader(cacheServiceSegmentAsString)), new JsonObject());
Assert.assertEquals(DateTools.changeToUTCSlice(201703310915L), segmentSaveAnswer_1.minute);
Assert.assertEquals(DateTools.changeToUTCSlice(201703310900L), segmentSaveAnswer_1.hour);
......@@ -341,11 +340,11 @@ public class SegmentPostTestCase {
public class SegmentOtherAnswer implements Answer<Object> {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice;
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice;
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)invocation.getArguments()[0];
segmentWithTimeSlice = (SegmentReceiver.SegmentWithTimeSlice)invocation.getArguments()[0];
return null;
}
}
......@@ -384,9 +383,9 @@ public class SegmentPostTestCase {
}
}
class IsSegmentWithTimeSlice extends ArgumentMatcher<SegmentPost.SegmentWithTimeSlice> {
class IsSegmentWithTimeSlice extends ArgumentMatcher<SegmentReceiver.SegmentWithTimeSlice> {
public boolean matches(Object para) {
return para instanceof SegmentPost.SegmentWithTimeSlice;
return para instanceof SegmentReceiver.SegmentWithTimeSlice;
}
}
}
......@@ -2,9 +2,11 @@ package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.network.proto.TraceSegmentObject;
/**
* @author pengys5
......@@ -21,17 +23,16 @@ public enum SegmentDeserializeFromFile {
* @return on {@link List <Segment>}
* @throws Exception if json data illegal or file broken.
*/
public List<Segment> deserializeMultiple(String segmentJsonFile) throws Exception {
List<Segment> segmentList = new ArrayList<>();
public List<TraceSegmentObject> deserializeMultiple(String segmentJsonFile) throws Exception {
List<TraceSegmentObject> segmentList = new ArrayList<>();
streamReader(segmentList, new FileReader(segmentJsonFile));
return segmentList;
}
private void streamReader(List<Segment> segmentList, FileReader fileReader) throws Exception {
private void streamReader(List<TraceSegmentObject> segmentList, FileReader fileReader) throws Exception {
JsonArray segmentArray = gson.fromJson(fileReader, JsonArray.class);
for (int i = 0; i < segmentArray.size(); i++) {
Segment segment = gson.fromJson(segmentArray.get(i), Segment.class);
segmentList.add(segment);
JsonObject segmentObj = segmentArray.get(i).getAsJsonObject();
}
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import java.lang.reflect.Field;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class BooleanTagTestCase {
@Test
public void test() throws NoSuchFieldException, IllegalAccessException {
BooleanTag booleanTag = new BooleanTag("test", false);
Map<String, Boolean> tagsWithInt = new LinkedHashMap<>();
Span span = new Span();
Field testAField = span.getClass().getDeclaredField("tagsWithBool");
testAField.setAccessible(true);
testAField.set(span, tagsWithInt);
Assert.assertEquals(false, booleanTag.get(span));
tagsWithInt.put("test", true);
Assert.assertEquals(true, booleanTag.get(span));
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import java.lang.reflect.Field;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class IntTagTestCase {
@Test
public void test() throws NoSuchFieldException, IllegalAccessException {
IntTag intTag = new IntTag("test");
Map<String, Integer> tagsWithInt = new LinkedHashMap<>();
Span span = new Span();
Field testAField = span.getClass().getDeclaredField("tagsWithInt");
testAField.setAccessible(true);
testAField.set(span, tagsWithInt);
Assert.assertEquals(null, intTag.get(span));
tagsWithInt.put("test", 10);
Assert.assertEquals(10, intTag.get(span).intValue());
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import java.lang.reflect.Field;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class ShortTagTestCase {
@Test
public void test() throws NoSuchFieldException, IllegalAccessException {
ShortTag shortTag = new ShortTag("short");
Map<String, Integer> tagsWithInt = new LinkedHashMap<>();
Span span = new Span();
Field testAField = span.getClass().getDeclaredField("tagsWithInt");
testAField.setAccessible(true);
testAField.set(span, tagsWithInt);
Short tag = shortTag.get(span);
Assert.assertEquals(null, tag);
tagsWithInt.put("short", 10);
tag = shortTag.get(span);
Assert.assertEquals(10, tag.intValue());
}
}
package org.skywalking.apm.collector.worker.segment.entity.tag;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import java.lang.reflect.Field;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class TagsTestCase {
@Test
public void test() throws NoSuchFieldException, IllegalAccessException {
Span span = new Span();
Map<String, String> tagsWithStr = new LinkedHashMap<>();
tagsWithStr.put("span.layer", "db");
Field testAField = span.getClass().getDeclaredField("tagsWithStr");
testAField.setAccessible(true);
testAField.set(span, tagsWithStr);
Assert.assertEquals("db", Tags.SPAN_LAYER.get(span));
}
}
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.network.proto.KeyWithStringValue;
/**
* @author pengys5
*/
public enum KeyWithStringValueFromJson {
INSTANCE;
private static final String key = "key";
private static final String value = "value";
public List<KeyWithStringValue.Builder> build(JsonArray kvArray) {
List<KeyWithStringValue.Builder> kv = new LinkedList<>();
for (int i = 0; i < kvArray.size(); i++) {
JsonObject tagJson = kvArray.get(i).getAsJsonObject();
KeyWithStringValue.Builder builder = KeyWithStringValue.newBuilder();
buildKV(builder, tagJson);
kv.add(builder);
}
return kv;
}
private void buildKV(KeyWithStringValue.Builder builder, JsonObject tagJson) {
builder.setKey(tagJson.get(key).getAsString());
builder.setValue(tagJson.get(value).getAsString());
}
}
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
/**
* @author pengys5
*/
public enum LogsFromJson {
INSTANCE;
private static final String time = "time";
private static final String data = "data";
public List<LogMessage.Builder> build(JsonArray logsArray) {
List<LogMessage.Builder> logs = new LinkedList<>();
for (int i = 0; i < logsArray.size(); i++) {
JsonObject logJson = logsArray.get(i).getAsJsonObject();
LogMessage.Builder builder = LogMessage.newBuilder();
buildLogMessage(builder, logJson);
logs.add(builder);
}
return logs;
}
private void buildLogMessage(LogMessage.Builder builder, JsonObject logJson) {
if (logJson.has(time)) {
builder.setTime(logJson.get(time).getAsLong());
}
if (logJson.has(data)) {
List<KeyWithStringValue.Builder> logsBuilders = TagsFromJson.INSTANCE.build(logJson.get(data).getAsJsonArray());
logsBuilders.forEach(logsBuilder -> {
builder.addData(logsBuilder);
});
}
}
}
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.network.proto.TraceSegmentReference;
/**
* @author pengys5
*/
public enum ReferencesFromJson {
INSTANCE;
private static final String parentTraceSegmentId = "parentTraceSegmentId";
private static final String parentSpanId = "parentSpanId";
private static final String parentApplicationId = "parentApplicationId";
private static final String networkAddress = "networkAddress";
private static final String networkAddressId = "networkAddressId";
private static final String entryServiceName = "entryServiceName";
private static final String entryServiceId = "entryServiceId";
public List<TraceSegmentReference.Builder> build(JsonArray spanArray) {
List<TraceSegmentReference.Builder> references = new LinkedList<>();
for (int i = 0; i < spanArray.size(); i++) {
JsonObject spanJson = spanArray.get(i).getAsJsonObject();
TraceSegmentReference.Builder builder = TraceSegmentReference.newBuilder();
buildRef(builder, spanJson);
references.add(builder);
}
return references;
}
private void buildRef(TraceSegmentReference.Builder builder, JsonObject referenceJsonObj) {
if (referenceJsonObj.has(parentTraceSegmentId)) {
builder.setParentTraceSegmentId(referenceJsonObj.get(parentTraceSegmentId).getAsString());
}
if (referenceJsonObj.has(parentSpanId)) {
builder.setParentSpanId(referenceJsonObj.get(parentSpanId).getAsInt());
}
if (referenceJsonObj.has(parentApplicationId)) {
builder.setParentApplicationId(referenceJsonObj.get(parentApplicationId).getAsInt());
}
if (referenceJsonObj.has(networkAddress)) {
builder.setNetworkAddress(referenceJsonObj.get(networkAddress).getAsString());
}
if (referenceJsonObj.has(networkAddressId)) {
builder.setNetworkAddressId(referenceJsonObj.get(networkAddressId).getAsInt());
}
if (referenceJsonObj.has(entryServiceName)) {
builder.setEntryServiceName(referenceJsonObj.get(entryServiceName).getAsString());
}
if (referenceJsonObj.has(entryServiceId)) {
builder.setEntryServiceId(referenceJsonObj.get(entryServiceId).getAsInt());
}
}
}
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonObject;
import java.util.List;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
/**
* @author pengys5
*/
public enum SegmentFromJson {
INSTANCE;
private static final String traceSegmentId = "traceSegmentId";
private static final String refs = "refs";
private static final String spans = "spans";
private static final String applicationId = "applicationId";
private static final String applicationInstanceId = "applicationInstanceId";
public TraceSegmentObject build(JsonObject segmentJsonObj) {
TraceSegmentObject.Builder builder = TraceSegmentObject.newBuilder();
buildSegment(builder, segmentJsonObj);
return builder.build();
}
private void buildSegment(TraceSegmentObject.Builder builder, JsonObject segmentJsonObj) {
builder.setTraceSegmentId(segmentJsonObj.get(traceSegmentId).getAsString());
builder.setApplicationId(segmentJsonObj.get(applicationId).getAsInt());
builder.setApplicationInstanceId(segmentJsonObj.get(applicationInstanceId).getAsInt());
if (segmentJsonObj.has(refs)) {
List<TraceSegmentReference.Builder> refBuilders = ReferencesFromJson.INSTANCE.build(segmentJsonObj.get(refs).getAsJsonArray());
refBuilders.forEach(refBuilder -> builder.addRefs(refBuilder));
}
List<SpanObject.Builder> spanBuilders = SpanFromJson.INSTANCE.build(segmentJsonObj.get(spans).getAsJsonArray());
spanBuilders.forEach(spanBuilder -> builder.addSpans(spanBuilder));
}
}
\ No newline at end of file
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonArray;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.worker.AnalysisMember;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserializeFromFile;
import org.skywalking.apm.collector.worker.tools.DateTools;
import org.skywalking.apm.collector.worker.tools.JsonFileReader;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author pengys5
*/
public class SegmentMock {
private String path = this.getClass().getResource("/").getPath();
private static String path = SegmentMock.class.getResource("/").getPath();
private final String CacheServiceJsonFile = path + "/json/segment/post/normal/cache-service.json";
private final String PersistenceServiceJsonFile = path + "/json/segment/post/normal/persistence-service.json";
private final String PortalServiceJsonFile = path + "/json/segment/post/normal/portal-service.json";
public static final String PortalServiceJsonFile = path + "/json/segment/grpc/normal/portal-service.json";
private final String CacheServiceExceptionJsonFile = path + "/json/segment/post/exception/cache-service.json";
private final String PortalServiceExceptionJsonFile = path + "/json/segment/post/exception/portal-service.json";
......@@ -42,65 +44,73 @@ public class SegmentMock {
return JsonFileReader.INSTANCE.read(PersistenceServiceJsonFile);
}
public String mockPortalServiceSegmentAsString() throws FileNotFoundException {
return JsonFileReader.INSTANCE.read(PortalServiceJsonFile);
public static List<UpstreamSegment> mockPortalServiceSegment() throws FileNotFoundException {
List<UpstreamSegment> upstreamSegmentList = new LinkedList<>();
JsonArray segmentArray = JsonFileReader.INSTANCE.parse(PortalServiceJsonFile).getAsJsonArray();
segmentArray.forEach(segmentObj -> {
UpstreamSegment upstreamSegment = UpstreamSegmentFromJson.INSTANCE.build(segmentObj.getAsJsonObject());
upstreamSegmentList.add(upstreamSegment);
});
return upstreamSegmentList;
}
public List<SegmentPost.SegmentWithTimeSlice> mockCacheServiceExceptionSegmentTimeSlice() throws Exception {
public List<SegmentReceiver.SegmentWithTimeSlice> mockCacheServiceExceptionSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(CacheServiceExceptionJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockPortalServiceExceptionSegmentTimeSlice() throws Exception {
public List<SegmentReceiver.SegmentWithTimeSlice> mockPortalServiceExceptionSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(PortalServiceExceptionJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockCacheServiceSegmentSegmentTimeSlice() throws Exception {
public List<SegmentReceiver.SegmentWithTimeSlice> mockCacheServiceSegmentSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(CacheServiceJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockPersistenceServiceSegmentTimeSlice() throws Exception {
public List<SegmentReceiver.SegmentWithTimeSlice> mockPersistenceServiceSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(PersistenceServiceJsonFile);
}
public List<SegmentPost.SegmentWithTimeSlice> mockPortalServiceSegmentSegmentTimeSlice() throws Exception {
public List<SegmentReceiver.SegmentWithTimeSlice> mockPortalServiceSegmentSegmentTimeSlice() throws Exception {
return createSegmentWithTimeSliceList(PortalServiceJsonFile);
}
private List<SegmentPost.SegmentWithTimeSlice> createSegmentWithTimeSliceList(
private List<SegmentReceiver.SegmentWithTimeSlice> createSegmentWithTimeSliceList(
String jsonFilePath) throws Exception {
List<Segment> segmentList = SegmentDeserializeFromFile.INSTANCE.deserializeMultiple(jsonFilePath);
List<TraceSegmentObject> segmentList = SegmentDeserializeFromFile.INSTANCE.deserializeMultiple(jsonFilePath);
List<SegmentPost.SegmentWithTimeSlice> segmentWithTimeSliceList = new ArrayList<>();
for (Segment segment : segmentList) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = createSegmentWithTimeSlice(segment);
List<SegmentReceiver.SegmentWithTimeSlice> segmentWithTimeSliceList = new ArrayList<>();
for (TraceSegmentObject segment : segmentList) {
SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = createSegmentWithTimeSlice(segment);
segmentWithTimeSliceList.add(segmentWithTimeSlice);
}
return segmentWithTimeSliceList;
}
private SegmentPost.SegmentWithTimeSlice createSegmentWithTimeSlice(Segment segment) {
long minuteSlice = DateTools.getMinuteSlice(segment.getStartTime());
long hourSlice = DateTools.getHourSlice(segment.getStartTime());
long daySlice = DateTools.getDaySlice(segment.getStartTime());
int second = DateTools.getSecond(segment.getStartTime());
private SegmentReceiver.SegmentWithTimeSlice createSegmentWithTimeSlice(TraceSegmentObject segment) {
// long minuteSlice = DateTools.getMinuteSlice(segment.getStartTime());
// long hourSlice = DateTools.getHourSlice(segment.getStartTime());
// long daySlice = DateTools.getDaySlice(segment.getStartTime());
// int second = DateTools.getSecond(segment.getStartTime());
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = new SegmentPost.SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
return segmentWithTimeSlice;
// SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice = new SegmentReceiver.SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
// return segmentWithTimeSlice;
return null;
}
public void executeAnalysis(AnalysisMember analysis) throws Exception {
List<SegmentPost.SegmentWithTimeSlice> cacheServiceSegment = this.mockCacheServiceSegmentSegmentTimeSlice();
for (SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice : cacheServiceSegment) {
List<SegmentReceiver.SegmentWithTimeSlice> cacheServiceSegment = this.mockCacheServiceSegmentSegmentTimeSlice();
for (SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice : cacheServiceSegment) {
analysis.analyse(segmentWithTimeSlice);
}
List<SegmentPost.SegmentWithTimeSlice> portalServiceSegment = this.mockPortalServiceSegmentSegmentTimeSlice();
for (SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice : portalServiceSegment) {
List<SegmentReceiver.SegmentWithTimeSlice> portalServiceSegment = this.mockPortalServiceSegmentSegmentTimeSlice();
for (SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice : portalServiceSegment) {
analysis.analyse(segmentWithTimeSlice);
}
List<SegmentPost.SegmentWithTimeSlice> persistenceServiceSegment = this.mockPersistenceServiceSegmentTimeSlice();
for (SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice : persistenceServiceSegment) {
List<SegmentReceiver.SegmentWithTimeSlice> persistenceServiceSegment = this.mockPersistenceServiceSegmentTimeSlice();
for (SegmentReceiver.SegmentWithTimeSlice segmentWithTimeSlice : persistenceServiceSegment) {
analysis.analyse(segmentWithTimeSlice);
}
......
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
import org.skywalking.apm.network.proto.SpanObject;
/**
* @author pengys5
*/
public enum SpanFromJson {
INSTANCE;
private static final String spanId = "spanId";
private static final String parentSpanId = "parentSpanId";
private static final String startTime = "startTime";
private static final String endTime = "endTime";
private static final String operationNameId = "operationNameId";
private static final String operationName = "operationName";
private static final String peerId = "peerId";
private static final String peer = "peer";
private static final String spanType = "spanType";
private static final String spanLayer = "spanLayer";
private static final String componentId = "componentId";
private static final String component = "component";
private static final String isError = "isError";
private static final String tags = "tags";
private static final String logs = "logs";
public List<SpanObject.Builder> build(JsonArray spanArray) {
List<SpanObject.Builder> spans = new LinkedList<>();
for (int i = 0; i < spanArray.size(); i++) {
JsonObject spanJson = spanArray.get(i).getAsJsonObject();
SpanObject.Builder builder = SpanObject.newBuilder();
buildSpan(builder, spanJson);
spans.add(builder);
}
return spans;
}
private void buildSpan(SpanObject.Builder builder, JsonObject spanJsonObj) {
if (spanJsonObj.has(spanId)) {
builder.setSpanId(spanJsonObj.get(spanId).getAsInt());
}
if (spanJsonObj.has(parentSpanId)) {
builder.setParentSpanId(spanJsonObj.get(parentSpanId).getAsInt());
}
if (spanJsonObj.has(startTime)) {
builder.setStartTime(spanJsonObj.get(startTime).getAsLong());
}
if (spanJsonObj.has(endTime)) {
builder.setEndTime(spanJsonObj.get(endTime).getAsLong());
}
if (spanJsonObj.has(operationNameId)) {
builder.setOperationNameId(spanJsonObj.get(operationNameId).getAsInt());
}
if (spanJsonObj.has(operationName)) {
builder.setOperationName(spanJsonObj.get(operationName).getAsString());
}
if (spanJsonObj.has(peerId)) {
builder.setPeerId(spanJsonObj.get(peerId).getAsInt());
}
if (spanJsonObj.has(peer)) {
builder.setPeer(spanJsonObj.get(peer).getAsString());
}
if (spanJsonObj.has(spanType)) {
builder.setSpanTypeValue(spanJsonObj.get(spanType).getAsInt());
}
if (spanJsonObj.has(spanLayer)) {
builder.setSpanLayerValue(spanJsonObj.get(spanLayer).getAsInt());
}
if (spanJsonObj.has(component)) {
builder.setComponent(spanJsonObj.get(component).getAsString());
}
if (spanJsonObj.has(componentId)) {
builder.setComponentId(spanJsonObj.get(componentId).getAsInt());
}
if (spanJsonObj.has(isError)) {
builder.setIsError(spanJsonObj.get(isError).getAsBoolean());
}
if (spanJsonObj.has(tags)) {
List<KeyWithStringValue.Builder> tagsBuilders = TagsFromJson.INSTANCE.build(spanJsonObj.get(tags).getAsJsonArray());
tagsBuilders.forEach(tagsBuilder -> {
builder.addTags(tagsBuilder);
});
}
if (spanJsonObj.has(logs)) {
List<LogMessage.Builder> logsBuilders = LogsFromJson.INSTANCE.build(spanJsonObj.get(logs).getAsJsonArray());
logsBuilders.forEach(logsBuilder -> {
builder.addLogs(logsBuilder);
});
}
}
}
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonArray;
import java.util.List;
import org.skywalking.apm.network.proto.KeyWithStringValue;
/**
* @author pengys5
*/
public enum TagsFromJson {
INSTANCE;
public List<KeyWithStringValue.Builder> build(JsonArray tagsArray) {
return KeyWithStringValueFromJson.INSTANCE.build(tagsArray);
}
}
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author pengys5
*/
public enum UpstreamSegmentFromJson {
INSTANCE;
private static final String globalTraceIds = "globalTraceIds";
private static final String segment = "segment";
public UpstreamSegment build(JsonObject upstreamJsonObj) {
UpstreamSegment.Builder builder = UpstreamSegment.newBuilder();
buildUpStream(builder, upstreamJsonObj);
return builder.build();
}
private void buildUpStream(UpstreamSegment.Builder builder, JsonObject upstreamJsonObj) {
if (upstreamJsonObj.has(globalTraceIds)) {
JsonArray globalTraceIdArray = upstreamJsonObj.get(globalTraceIds).getAsJsonArray();
globalTraceIdArray.forEach(globalTraceIdElement -> {
builder.addGlobalTraceIds(globalTraceIdElement.getAsString());
});
}
TraceSegmentObject segmentObject = SegmentFromJson.INSTANCE.build(upstreamJsonObj.get(segment).getAsJsonObject());
builder.setSegment(segmentObject.toByteString());
}
}
package org.skywalking.apm.collector.worker.segment.mock;
import com.google.gson.JsonArray;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.FileNotFoundException;
import java.util.LinkedList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.worker.tools.JsonFileReader;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author pengys5
*/
public class UpstreamSegmentFromJsonTestCase {
@Test
public void testBuild() throws FileNotFoundException, InvalidProtocolBufferException {
JsonArray segmentArray = JsonFileReader.INSTANCE.parse(SegmentMock.PortalServiceJsonFile).getAsJsonArray();
List<UpstreamSegment> upstreamSegmentList = new LinkedList<>();
segmentArray.forEach(segmentJsonObj -> {
UpstreamSegment upstreamSegment = UpstreamSegmentFromJson.INSTANCE.build(segmentJsonObj.getAsJsonObject());
upstreamSegmentList.add(upstreamSegment);
});
Assert.assertEquals(1, upstreamSegmentList.size());
UpstreamSegment upstreamSegment_1 = upstreamSegmentList.get(0);
Assert.assertEquals(1, upstreamSegment_1.getGlobalTraceIdsCount());
Assert.assertEquals("Trace.1490922929254.1797892356.6003.69.2", upstreamSegment_1.getGlobalTraceIds(0));
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(upstreamSegment_1.getSegment());
Assert.assertEquals("Segment.1490922929254.1797892356.6003.69.1", segmentObject.getTraceSegmentId());
Assert.assertEquals(1, segmentObject.getApplicationId());
Assert.assertEquals(1, segmentObject.getApplicationInstanceId());
Assert.assertEquals(1, segmentObject.getSpansCount());
SpanObject span_0 = segmentObject.getSpans(0);
Assert.assertEquals(0, span_0.getSpanId());
Assert.assertEquals(-1, span_0.getParentSpanId());
Assert.assertEquals(0, span_0.getOperationNameId());
Assert.assertEquals("/portal/", span_0.getOperationName());
Assert.assertEquals(0, span_0.getPeerId());
Assert.assertEquals("0:0:0:0:0:0:0:1:57837", span_0.getPeer());
Assert.assertEquals(0, span_0.getSpanTypeValue());
Assert.assertEquals(2, span_0.getSpanLayerValue());
Assert.assertEquals(0, span_0.getComponentId());
Assert.assertEquals("Tomcat", span_0.getComponent());
Assert.assertEquals(false, span_0.getIsError());
Assert.assertEquals(2, span_0.getTagsCount());
KeyWithStringValue tag_0 = span_0.getTags(0);
Assert.assertEquals("key_1", tag_0.getKey());
Assert.assertEquals("value_1", tag_0.getValue());
KeyWithStringValue tag_1 = span_0.getTags(1);
Assert.assertEquals("key_2", tag_1.getKey());
Assert.assertEquals("value_2", tag_1.getValue());
}
}
package org.skywalking.apm.collector.worker.span.persistence;
import com.google.gson.JsonObject;
import java.util.TimeZone;
import org.elasticsearch.action.get.GetResponse;
import org.junit.Assert;
import org.junit.Before;
......@@ -18,9 +19,6 @@ import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.mock.SegmentMock;
import org.skywalking.apm.collector.worker.storage.GetResponseFromEs;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import java.util.TimeZone;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
......@@ -29,8 +27,8 @@ import static org.mockito.Mockito.when;
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest( {GetResponseFromEs.class})
@PowerMockIgnore( {"javax.management.*"})
@PrepareForTest({GetResponseFromEs.class})
@PowerMockIgnore({"javax.management.*"})
public class SpanSearchWithIdTestCase {
private GetResponseFromEs getResponseFromEs;
......@@ -78,14 +76,4 @@ public class SpanSearchWithIdTestCase {
Assert.assertEquals("/portal/", value);
}
private TraceSegment create() {
TraceSegment segment = new TraceSegment();
Span span = new Span();
span.setTag("Tag", "VALUE");
span.finish(segment);
segment.finish();
return segment;
}
}
......@@ -18,6 +18,11 @@ public enum JsonFileReader {
return jsonElement.toString();
}
public JsonElement parse(String path) throws FileNotFoundException {
JsonParser jsonParser = new JsonParser();
return jsonParser.parse(new FileReader(path));
}
public String readSegment(String path) throws FileNotFoundException {
JsonParser jsonParser = new JsonParser();
JsonElement jsonElement = jsonParser.parse(new FileReader(path));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册