提交 cf248844 编写于 作者: P peng-yongsheng

Segments divided into two types, depending on if ids exchange successful.

1. Success: send to analytic worker
2. Failure: write into file.
上级 3bc3cc53
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agentserver;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -28,10 +29,10 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class AgentServerModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_server";
private final AgentServerCommonModuleInstaller installer;
private final AgentServerModuleInstaller installer;
public AgentServerModuleGroupDefine() {
installer = new AgentServerCommonModuleInstaller();
installer = new AgentServerModuleInstaller();
}
@Override public String name() {
......@@ -45,4 +46,8 @@ public class AgentServerModuleGroupDefine implements ModuleGroupDefine {
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
@Override public GroupConfigParser groupConfigParser() {
return null;
}
}
......@@ -20,12 +20,12 @@ package org.skywalking.apm.collector.agentserver;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
/**
* @author pengys5
*/
public class AgentServerCommonModuleInstaller extends MultipleCommonModuleInstaller {
public class AgentServerModuleInstaller extends MultipleModuleInstaller {
@Override public String groupName() {
return AgentServerModuleGroupDefine.GROUP_NAME;
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.config.BufferFileConfig;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
/**
* @author pengys5
*/
public class AgentStreamModuleGroupConfigParser implements GroupConfigParser {
private static final String BUFFER_OFFSET_MAX_FILE_SIZE = "buffer_offset_max_file_size";
private static final String BUFFER_SEGMENT_MAX_FILE_SIZE = "buffer_segment_max_file_size";
@Override public void parse(Map<String, Map> config) {
if (config.containsKey(GroupConfigParser.NODE_NAME)) {
Map<String, String> groupConfig = config.get(GroupConfigParser.NODE_NAME);
if (groupConfig.containsKey(BUFFER_OFFSET_MAX_FILE_SIZE)) {
String sizeStr = groupConfig.get(BUFFER_OFFSET_MAX_FILE_SIZE).toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("KB")) {
int size = Integer.parseInt(sizeStr.replace("KB", ""));
BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("M")) {
int size = Integer.parseInt(sizeStr.replace("M", ""));
BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024 * 1024;
} else if (sizeStr.endsWith("MB")) {
int size = Integer.parseInt(sizeStr.replace("MB", ""));
BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024 * 1024;
} else {
BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
} else {
BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
if (groupConfig.containsKey(BUFFER_SEGMENT_MAX_FILE_SIZE)) {
String sizeStr = groupConfig.get(BUFFER_SEGMENT_MAX_FILE_SIZE).toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("KB")) {
int size = Integer.parseInt(sizeStr.replace("KB", ""));
BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("M")) {
int size = Integer.parseInt(sizeStr.replace("M", ""));
BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024 * 1024;
} else if (sizeStr.endsWith("MB")) {
int size = Integer.parseInt(sizeStr.replace("MB", ""));
BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024 * 1024;
} else {
BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE = 1024 * 1024;
}
} else {
BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE = 1024 * 1024;
}
}
}
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agentstream;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -45,4 +46,8 @@ public class AgentStreamModuleGroupDefine implements ModuleGroupDefine {
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
@Override public GroupConfigParser groupConfigParser() {
return new AgentStreamModuleGroupConfigParser();
}
}
......@@ -24,13 +24,13 @@ import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public class AgentStreamModuleInstaller extends MultipleCommonModuleInstaller {
public class AgentStreamModuleInstaller extends MultipleModuleInstaller {
@Override public String groupName() {
return AgentStreamModuleGroupDefine.GROUP_NAME;
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.config;
/**
* @author pengys5
*/
public class BufferFileConfig {
public static int BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
public static int BUFFER_SEGMENT_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
......@@ -18,15 +18,11 @@
package org.skywalking.apm.collector.agentstream.grpc.handler;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.SegmentParse;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -43,13 +39,7 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
SegmentParse segmentParse = new SegmentParse();
try {
List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getSegment());
segmentParse.parse(traceIds, segmentObject);
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage(), e);
}
segmentParse.parse(segment, SegmentParse.Source.Agent);
}
@Override public void onError(Throwable throwable) {
......
......@@ -66,7 +66,7 @@ public class TraceSegmentServletHandler extends JettyHandler {
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse();
TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getGlobalTraceIds(), traceSegment.getTraceSegmentObject());
segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
}
reader.endArray();
}
......
......@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject> {
public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject.Builder> {
private final Logger logger = LoggerFactory.getLogger(SegmentJsonReader.class);
......@@ -41,7 +41,7 @@ public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject> {
private static final String TRACE_SEGMENT_REFERENCE = "rs";
private static final String SPANS = "ss";
@Override public TraceSegmentObject read(JsonReader reader) throws IOException {
@Override public TraceSegmentObject.Builder read(JsonReader reader) throws IOException {
TraceSegmentObject.Builder builder = TraceSegmentObject.newBuilder();
reader.beginObject();
......@@ -82,6 +82,6 @@ public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject> {
}
reader.endObject();
return builder.build();
return builder;
}
}
......@@ -18,36 +18,30 @@
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author pengys5
*/
public class TraceSegment {
private List<UniqueId> uniqueIds;
private TraceSegmentObject traceSegmentObject;
private UpstreamSegment.Builder builder;
public TraceSegment() {
uniqueIds = new ArrayList<>();
builder = UpstreamSegment.newBuilder();
}
public List<UniqueId> getGlobalTraceIds() {
return uniqueIds;
public void addGlobalTraceId(UniqueId.Builder globalTraceId) {
builder.addGlobalTraceIds(globalTraceId);
}
public void addGlobalTraceId(UniqueId globalTraceId) {
uniqueIds.add(globalTraceId);
public void setTraceSegmentBuilder(TraceSegmentObject.Builder traceSegmentBuilder) {
builder.setSegment(traceSegmentBuilder.build().toByteString());
}
public TraceSegmentObject getTraceSegmentObject() {
return traceSegmentObject;
}
public void setTraceSegmentObject(TraceSegmentObject traceSegmentObject) {
this.traceSegmentObject = traceSegmentObject;
public UpstreamSegment getUpstreamSegment() {
return builder.build();
}
}
......@@ -49,16 +49,9 @@ public class TraceSegmentJsonReader implements StreamJsonReader<TraceSegment> {
}
reader.endArray();
if (logger.isDebugEnabled()) {
traceSegment.getGlobalTraceIds().forEach(uniqueId -> {
StringBuilder globalTraceId = new StringBuilder();
uniqueId.getIdPartsList().forEach(idPart -> globalTraceId.append(idPart));
logger.debug("global trace id: {}", globalTraceId.toString());
});
}
break;
case SEGMENT:
traceSegment.setTraceSegmentObject(segmentJsonReader.read(reader));
traceSegment.setTraceSegmentBuilder(segmentJsonReader.read(reader));
break;
default:
reader.skipValue();
......
......@@ -25,9 +25,9 @@ import org.skywalking.apm.network.proto.UniqueId;
/**
* @author pengys5
*/
public class UniqueIdJsonReader implements StreamJsonReader<UniqueId> {
public class UniqueIdJsonReader implements StreamJsonReader<UniqueId.Builder> {
@Override public UniqueId read(JsonReader reader) throws IOException {
@Override public UniqueId.Builder read(JsonReader reader) throws IOException {
UniqueId.Builder builder = UniqueId.newBuilder();
reader.beginArray();
......@@ -35,6 +35,6 @@ public class UniqueIdJsonReader implements StreamJsonReader<UniqueId> {
builder.addIdParts(reader.nextLong());
}
reader.endArray();
return builder.build();
return builder;
}
}
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -30,7 +31,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.UniqueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,8 +47,9 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId
private long timeBucket;
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
this.timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
this.timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
this.segmentId = segmentId;
}
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agentstream.worker.instance.performance;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -28,7 +29,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,15 +45,17 @@ public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpan
private long timeBucket;
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
this.applicationId = applicationId;
this.instanceId = applicationInstanceId;
this.cost = spanObject.getEndTime() - spanObject.getStartTime();
timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(spanObject.getStartTime());
this.cost = spanDecorator.getEndTime() - spanDecorator.getStartTime();
timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(spanDecorator.getStartTime());
}
@Override public void build() {
......
......@@ -23,6 +23,7 @@ import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -31,7 +32,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,39 +46,34 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
private long timeBucket;
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
public void parseExit(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, String segmentId) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setComponentId(spanObject.getComponentId());
nodeComponent.setComponentId(spanDecorator.getComponentId());
String id;
if (spanObject.getComponentId() == 0) {
nodeComponent.setComponentName(spanObject.getComponent());
if (spanDecorator.getComponentId() == 0) {
nodeComponent.setComponentName(spanDecorator.getComponent());
id = nodeComponent.getComponentName();
} else {
nodeComponent.setComponentName(Const.EMPTY_STRING);
id = String.valueOf(nodeComponent.getComponentId());
}
nodeComponent.setPeerId(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
nodeComponent.setPeer(spanObject.getPeer());
id = id + Const.ID_SPLIT + nodeComponent.getPeer();
} else {
nodeComponent.setPeer(Const.EMPTY_STRING);
id = id + Const.ID_SPLIT + nodeComponent.getPeerId();
}
nodeComponent.setPeerId(spanDecorator.getPeerId());
id = id + Const.ID_SPLIT + nodeComponent.getPeerId();
nodeComponent.setId(id);
nodeComponents.add(nodeComponent);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setComponentId(spanObject.getComponentId());
nodeComponent.setComponentId(spanDecorator.getComponentId());
String id;
if (spanObject.getComponentId() == 0) {
nodeComponent.setComponentName(spanObject.getComponent());
if (spanDecorator.getComponentId() == 0) {
nodeComponent.setComponentName(spanDecorator.getComponent());
id = nodeComponent.getComponentName();
} else {
id = String.valueOf(nodeComponent.getComponentId());
......@@ -86,7 +81,6 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
}
nodeComponent.setPeerId(applicationId);
nodeComponent.setPeer(Const.EMPTY_STRING);
id = id + Const.ID_SPLIT + String.valueOf(applicationId);
nodeComponent.setId(id);
......@@ -94,8 +88,9 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
}
@Override public void build() {
......
......@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -30,8 +32,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,29 +45,21 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
private List<NodeMappingDataDefine.NodeMapping> nodeMappings = new ArrayList<>();
private long timeBucket;
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
@Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
logger.debug("node mapping listener parse reference");
NodeMappingDataDefine.NodeMapping nodeMapping = new NodeMappingDataDefine.NodeMapping();
nodeMapping.setApplicationId(applicationId);
nodeMapping.setAddressId(reference.getNetworkAddressId());
String id = String.valueOf(applicationId);
if (reference.getNetworkAddressId() != 0) {
nodeMapping.setAddress(Const.EMPTY_STRING);
id = id + Const.ID_SPLIT + String.valueOf(nodeMapping.getAddressId());
} else {
id = id + Const.ID_SPLIT + reference.getNetworkAddress();
nodeMapping.setAddress(reference.getNetworkAddress());
}
nodeMapping.setAddressId(referenceDecorator.getNetworkAddressId());
String id = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(nodeMapping.getAddressId());
nodeMapping.setId(id);
nodeMappings.add(nodeMapping);
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
}
@Override public void build() {
......
......@@ -23,6 +23,8 @@ import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.cache.InstanceCache;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.CollectionUtils;
......@@ -33,8 +35,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -49,59 +49,53 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
private List<NodeReferenceDataDefine.NodeReference> references = new LinkedList<>();
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
public void parseExit(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, String segmentId) {
NodeReferenceDataDefine.NodeReference nodeReference = new NodeReferenceDataDefine.NodeReference();
nodeReference.setFrontApplicationId(applicationId);
nodeReference.setBehindApplicationId(spanObject.getPeerId());
nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime()));
nodeReference.setBehindApplicationId(spanDecorator.getPeerId());
nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()));
StringBuilder idBuilder = new StringBuilder();
idBuilder.append(nodeReference.getTimeBucket()).append(Const.ID_SPLIT).append(applicationId);
if (spanObject.getPeerId() != 0) {
nodeReference.setBehindPeer(Const.EMPTY_STRING);
idBuilder.append(Const.ID_SPLIT).append(spanObject.getPeerId());
} else {
nodeReference.setBehindPeer(spanObject.getPeer());
idBuilder.append(Const.ID_SPLIT).append(spanObject.getPeer());
}
idBuilder.append(nodeReference.getTimeBucket()).append(Const.ID_SPLIT).append(applicationId)
.append(Const.ID_SPLIT).append(spanDecorator.getPeerId());
nodeReference.setId(idBuilder.toString());
nodeReferences.add(buildNodeRefSum(nodeReference, spanObject.getStartTime(), spanObject.getEndTime(), spanObject.getIsError()));
nodeReferences.add(buildNodeRefSum(nodeReference, spanDecorator.getStartTime(), spanDecorator.getEndTime(), spanDecorator.getIsError()));
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
if (CollectionUtils.isNotEmpty(references)) {
references.forEach(nodeReference -> {
nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime()));
nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()));
String idBuilder = String.valueOf(nodeReference.getTimeBucket()) + Const.ID_SPLIT + nodeReference.getFrontApplicationId() +
Const.ID_SPLIT + nodeReference.getBehindApplicationId();
nodeReference.setId(idBuilder);
nodeReferences.add(buildNodeRefSum(nodeReference, spanObject.getStartTime(), spanObject.getEndTime(), spanObject.getIsError()));
nodeReferences.add(buildNodeRefSum(nodeReference, spanDecorator.getStartTime(), spanDecorator.getEndTime(), spanDecorator.getIsError()));
});
} else {
NodeReferenceDataDefine.NodeReference nodeReference = new NodeReferenceDataDefine.NodeReference();
nodeReference.setFrontApplicationId(Const.USER_ID);
nodeReference.setBehindApplicationId(applicationId);
nodeReference.setBehindPeer(Const.EMPTY_STRING);
nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime()));
nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()));
String idBuilder = String.valueOf(nodeReference.getTimeBucket()) + Const.ID_SPLIT + nodeReference.getFrontApplicationId() +
Const.ID_SPLIT + nodeReference.getBehindApplicationId();
nodeReference.setId(idBuilder);
nodeReferences.add(buildNodeRefSum(nodeReference, spanObject.getStartTime(), spanObject.getEndTime(), spanObject.getIsError()));
nodeReferences.add(buildNodeRefSum(nodeReference, spanDecorator.getStartTime(), spanDecorator.getEndTime(), spanDecorator.getIsError()));
}
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
@Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
int parentApplicationId = InstanceCache.get(referenceDecorator.getParentApplicationInstanceId());
NodeReferenceDataDefine.NodeReference referenceSum = new NodeReferenceDataDefine.NodeReference();
referenceSum.setFrontApplicationId(parentApplicationId);
referenceSum.setBehindApplicationId(applicationId);
referenceSum.setBehindPeer(Const.EMPTY_STRING);
references.add(referenceSum);
}
......
......@@ -18,11 +18,11 @@
package org.skywalking.apm.collector.agentstream.worker.segment;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
/**
* @author pengys5
*/
public interface EntrySpanListener extends SpanListener {
void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId);
void parseEntry(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -18,11 +18,11 @@
package org.skywalking.apm.collector.agentstream.worker.segment;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
/**
* @author pengys5
*/
public interface ExitSpanListener extends SpanListener {
void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId);
void parseExit(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -18,11 +18,11 @@
package org.skywalking.apm.collector.agentstream.worker.segment;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
/**
* @author pengys5
*/
public interface FirstSpanListener extends SpanListener {
void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId);
void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -18,11 +18,11 @@
package org.skywalking.apm.collector.agentstream.worker.segment;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
/**
* @author pengys5
*/
public interface LocalSpanListener extends SpanListener {
void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId);
void parseLocal(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -18,11 +18,11 @@
package org.skywalking.apm.collector.agentstream.worker.segment;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.ReferenceDecorator;
/**
* @author pengys5
*/
public interface RefsListener extends SpanListener {
void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId, String segmentId);
void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int applicationInstanceId, String segmentId);
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agentstream.worker.segment;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.global.GlobalTraceSpanListener;
......@@ -27,20 +28,24 @@ import org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingS
import org.skywalking.apm.collector.agentstream.worker.noderef.NodeReferenceSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.ReferenceIdExchanger;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SegmentDecorator;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SegmentStandardizationWorker;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanIdExchanger;
import org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.ServiceReferenceSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.storage.define.segment.SegmentDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,6 +57,7 @@ public class SegmentParse {
private final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
private List<SpanListener> spanListeners;
private String segmentId;
public SegmentParse() {
spanListeners = new ArrayList<>();
......@@ -65,51 +71,83 @@ public class SegmentParse {
spanListeners.add(new InstPerformanceSpanListener());
}
public void parse(List<UniqueId> traceIds, TraceSegmentObject segmentObject) {
public boolean parse(UpstreamSegment segment, Source source) {
try {
List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getSegment());
SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject);
if (!preBuild(traceIds, segmentDecorator)) {
logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentId);
if (source.equals(Source.Agent)) {
writeToBufferFile(segmentId, segment);
}
return false;
} else {
logger.debug("This segment id exchange success, id: {}", segmentId);
notifyListenerToBuild();
buildSegment(segmentId, segmentDecorator.toByteArray());
return true;
}
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage(), e);
}
return false;
}
private boolean preBuild(List<UniqueId> traceIds, SegmentDecorator segmentDecorator) {
StringBuilder segmentIdBuilder = new StringBuilder();
for (int i = 0; i < segmentObject.getTraceSegmentId().getIdPartsList().size(); i++) {
for (int i = 0; i < segmentDecorator.getTraceSegmentId().getIdPartsList().size(); i++) {
if (i == 0) {
segmentIdBuilder.append(segmentObject.getTraceSegmentId().getIdPartsList().get(i));
segmentIdBuilder.append(segmentDecorator.getTraceSegmentId().getIdPartsList().get(i));
} else {
segmentIdBuilder.append(".").append(segmentObject.getTraceSegmentId().getIdPartsList().get(i));
segmentIdBuilder.append(".").append(segmentDecorator.getTraceSegmentId().getIdPartsList().get(i));
}
}
String segmentId = segmentIdBuilder.toString();
segmentId = segmentIdBuilder.toString();
for (UniqueId uniqueId : traceIds) {
notifyGlobalsListener(uniqueId);
}
int applicationId = segmentObject.getApplicationId();
int applicationInstanceId = segmentObject.getApplicationInstanceId();
int applicationId = segmentDecorator.getApplicationId();
int applicationInstanceId = segmentDecorator.getApplicationInstanceId();
for (int i = 0; i < segmentDecorator.getRefsCount(); i++) {
ReferenceDecorator referenceDecorator = segmentDecorator.getRefs(i);
if (!ReferenceIdExchanger.getInstance().exchange(referenceDecorator, applicationId)) {
return false;
}
for (TraceSegmentReference reference : segmentObject.getRefsList()) {
notifyRefsListener(reference, applicationId, applicationInstanceId, segmentId);
notifyRefsListener(referenceDecorator, applicationId, applicationInstanceId, segmentId);
}
List<SpanObject> spans = segmentObject.getSpansList();
if (CollectionUtils.isNotEmpty(spans)) {
for (SpanObject spanObject : spans) {
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, applicationId, applicationInstanceId, segmentId);
}
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, applicationId, applicationInstanceId, segmentId);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, applicationId, applicationInstanceId, segmentId);
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, applicationId, applicationInstanceId, segmentId);
} else {
logger.error("span type error, span type: {}", spanObject.getSpanType().name());
}
if (!SpanIdExchanger.getInstance().exchange(spanDecorator, applicationId)) {
return false;
}
if (spanDecorator.getSpanId() == 0) {
notifyFirstListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
if (SpanType.Exit.equals(spanDecorator.getSpanType())) {
notifyExitListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
} else if (SpanType.Entry.equals(spanDecorator.getSpanType())) {
notifyEntryListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
} else if (SpanType.Local.equals(spanDecorator.getSpanType())) {
notifyLocalListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
} else {
logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name());
}
}
notifyListenerToBuild();
buildSegment(segmentId, segmentObject.toByteArray());
return true;
}
private void buildSegment(String id, byte[] dataBinary) {
......@@ -126,47 +164,58 @@ public class SegmentParse {
}
}
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
try {
logger.debug("send to segment buffer write worker, id: {}", id);
context.getClusterWorkerContext().lookup(SegmentStandardizationWorker.WorkerRole.INSTANCE).tell(upstreamSegment);
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
private void notifyListenerToBuild() {
spanListeners.forEach(SpanListener::build);
}
private void notifyExitListener(SpanObject spanObject, int applicationId, int applicationInstanceId,
private void notifyExitListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof ExitSpanListener) {
((ExitSpanListener)listener).parseExit(spanObject, applicationId, applicationInstanceId, segmentId);
((ExitSpanListener)listener).parseExit(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyEntryListener(SpanObject spanObject, int applicationId, int applicationInstanceId,
private void notifyEntryListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof EntrySpanListener) {
((EntrySpanListener)listener).parseEntry(spanObject, applicationId, applicationInstanceId, segmentId);
((EntrySpanListener)listener).parseEntry(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyLocalListener(SpanObject spanObject, int applicationId, int applicationInstanceId,
private void notifyLocalListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof LocalSpanListener) {
((LocalSpanListener)listener).parseLocal(spanObject, applicationId, applicationInstanceId, segmentId);
((LocalSpanListener)listener).parseLocal(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyFirstListener(SpanObject spanObject, int applicationId, int applicationInstanceId,
private void notifyFirstListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof FirstSpanListener) {
((FirstSpanListener)listener).parseFirst(spanObject, applicationId, applicationInstanceId, segmentId);
((FirstSpanListener)listener).parseFirst(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyRefsListener(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
private void notifyRefsListener(ReferenceDecorator reference, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof RefsListener) {
......@@ -182,4 +231,8 @@ public class SegmentParse {
}
}
}
public enum Source {
Agent, Buffer
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.buffer;
/**
* @author pengys5
*/
public class Offset {
private static final String SPLIT_CHARACTER = ",";
private ReadOffset readOffset;
private WriteOffset writeOffset;
public Offset() {
readOffset = new ReadOffset();
writeOffset = new WriteOffset();
}
public String serialize() {
return readOffset.getReadFileName() + SPLIT_CHARACTER + String.valueOf(readOffset.getReadFileOffset())
+ SPLIT_CHARACTER + writeOffset.getWriteFileName() + SPLIT_CHARACTER + String.valueOf(writeOffset.getWriteFileOffset());
}
public void deserialize(String value) {
String[] values = value.split(SPLIT_CHARACTER);
if (values.length == 4) {
this.readOffset.readFileName = values[0];
this.readOffset.readFileOffset = Long.parseLong(values[1]);
this.writeOffset.writeFileName = values[2];
this.writeOffset.writeFileOffset = Long.parseLong(values[3]);
}
}
public ReadOffset getReadOffset() {
return readOffset;
}
public WriteOffset getWriteOffset() {
return writeOffset;
}
public static class ReadOffset {
private String readFileName;
private long readFileOffset = 0;
public String getReadFileName() {
return readFileName;
}
public long getReadFileOffset() {
return readFileOffset;
}
public void setReadFileName(String readFileName) {
this.readFileName = readFileName;
}
public void setReadFileOffset(long readFileOffset) {
this.readFileOffset = readFileOffset;
}
}
public static class WriteOffset {
private String writeFileName;
private long writeFileOffset = 0;
public String getWriteFileName() {
return writeFileName;
}
public long getWriteFileOffset() {
return writeFileOffset;
}
public void setWriteFileName(String writeFileName) {
this.writeFileName = writeFileName;
}
public void setWriteFileOffset(long writeFileOffset) {
this.writeFileOffset = writeFileOffset;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.buffer;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.agentstream.config.BufferFileConfig;
import org.skywalking.apm.collector.agentstream.worker.util.FileUtils;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public enum OffsetManager {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(OffsetManager.class);
private static final String OFFSET_FILE_PREFIX = "offset";
private File offsetFile;
private Offset offset;
private boolean initialized = false;
private RandomAccessFile randomAccessFile = null;
private String lastOffsetRecord = Const.EMPTY_STRING;
public synchronized void initialize() throws IOException {
if (!initialized) {
this.offset = new Offset();
File dataPath = new File(SegmentBufferConfig.BUFFER_PATH);
if (dataPath.mkdirs()) {
createOffsetFile();
} else {
File[] offsetFiles = dataPath.listFiles(new PrefixFileNameFilter());
if (CollectionUtils.isNotEmpty(offsetFiles) && offsetFiles.length > 0) {
for (int i = 0; i < offsetFiles.length; i++) {
if (i != offsetFiles.length - 1) {
offsetFiles[i].delete();
} else {
offsetFile = offsetFiles[i];
}
}
} else {
createOffsetFile();
}
}
String offsetRecord = FileUtils.INSTANCE.readLastLine(offsetFile);
offset.deserialize(offsetRecord);
initialized = true;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> flush(), 10, 3, TimeUnit.SECONDS);
}
}
private void createOffsetFile() throws IOException {
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
offsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
this.offset.getWriteOffset().setWriteFileName(Const.EMPTY_STRING);
this.offset.getWriteOffset().setWriteFileOffset(0);
this.offset.getReadOffset().setReadFileName(Const.EMPTY_STRING);
this.offset.getReadOffset().setReadFileOffset(0);
this.flush();
}
public void flush() {
String offsetRecord = offset.serialize();
if (!lastOffsetRecord.equals(offsetRecord)) {
if (offsetFile.length() >= BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE) {
exchangeFile();
}
FileUtils.INSTANCE.writeAppendToLast(offsetFile, randomAccessFile, offsetRecord);
lastOffsetRecord = offsetRecord;
}
}
private void exchangeFile() {
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File newOffsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
offsetFile.delete();
offsetFile = newOffsetFile;
this.flush();
}
public String getReadFileName() {
return offset.getReadOffset().getReadFileName();
}
public long getReadFileOffset() {
return offset.getReadOffset().getReadFileOffset();
}
public void setReadOffset(long readFileOffset) {
offset.getReadOffset().setReadFileOffset(readFileOffset);
}
public void setReadOffset(String readFileName, long readFileOffset) {
offset.getReadOffset().setReadFileName(readFileName);
offset.getReadOffset().setReadFileOffset(readFileOffset);
}
public String getWriteFileName() {
return offset.getWriteOffset().getWriteFileName();
}
public long getWriteFileOffset() {
return offset.getWriteOffset().getWriteFileOffset();
}
public void setWriteOffset(String writeFileName, long writeFileOffset) {
offset.getWriteOffset().setWriteFileName(writeFileName);
offset.getWriteOffset().setWriteFileOffset(writeFileOffset);
}
public void setWriteOffset(long writeFileOffset) {
offset.getWriteOffset().setWriteFileOffset(writeFileOffset);
}
class PrefixFileNameFilter implements FilenameFilter {
@Override public boolean accept(File dir, String name) {
if (name.startsWith(OFFSET_FILE_PREFIX)) {
return true;
} else {
return false;
}
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.buffer;
import org.skywalking.apm.collector.core.config.SystemConfig;
/**
* @author pengys5
*/
public class SegmentBufferConfig {
public static String BUFFER_PATH = SystemConfig.DATA_PATH + "/buffer/";
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.buffer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.skywalking.apm.collector.agentstream.config.BufferFileConfig;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public enum SegmentBufferManager {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(SegmentBufferManager.class);
public static final String DATA_FILE_PREFIX = "data";
private FileOutputStream outputStream;
public synchronized void initialize() {
logger.info("segment buffer initialize");
try {
OffsetManager.INSTANCE.initialize();
if (new File(SegmentBufferConfig.BUFFER_PATH).mkdirs()) {
newDataFile();
} else {
String writeFileName = OffsetManager.INSTANCE.getWriteFileName();
if (StringUtils.isNotEmpty(writeFileName)) {
File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
if (dataFile.exists()) {
outputStream = new FileOutputStream(new File(SegmentBufferConfig.BUFFER_PATH + writeFileName), true);
} else {
newDataFile();
}
} else {
newDataFile();
}
}
SegmentBufferReader.INSTANCE.initialize();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
public synchronized void writeBuffer(UpstreamSegment segment) {
try {
segment.writeDelimitedTo(outputStream);
long position = outputStream.getChannel().position();
if (position > BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE) {
newDataFile();
} else {
OffsetManager.INSTANCE.setWriteOffset(position);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
private void newDataFile() throws IOException {
logger.debug("create new segment buffer file");
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
dataFile.createNewFile();
OffsetManager.INSTANCE.setWriteOffset(writeFileName, 0);
try {
if (outputStream != null) {
outputStream.close();
}
outputStream = new FileOutputStream(dataFile);
outputStream.getChannel().position(0);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
public synchronized void flush() {
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.buffer;
import com.google.protobuf.CodedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.agentstream.worker.segment.SegmentParse;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public enum SegmentBufferReader {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(SegmentBufferReader.class);
private InputStream inputStream;
public void initialize() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS);
}
private void preRead() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
if (StringUtils.isNotEmpty(readFileName)) {
File readFile = new File(SegmentBufferConfig.BUFFER_PATH + readFileName);
if (readFile.exists()) {
deleteTheDataFilesBeforeReadFile(readFileName);
long readFileOffset = OffsetManager.INSTANCE.getReadFileOffset();
read(readFile, readFileOffset);
readEarliestCreateDataFile();
} else {
deleteTheDataFilesBeforeReadFile(readFileName);
readEarliestCreateDataFile();
}
} else {
readEarliestCreateDataFile();
}
}
private void deleteTheDataFilesBeforeReadFile(String readFileName) {
File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
long readFileCreateTime = getFileCreateTime(readFileName);
for (File dataFile : dataFiles) {
long fileCreateTime = getFileCreateTime(dataFile.getName());
if (fileCreateTime < readFileCreateTime) {
dataFile.delete();
} else if (fileCreateTime == readFileCreateTime) {
break;
}
}
}
private long getFileCreateTime(String fileName) {
fileName = fileName.replace(SegmentBufferManager.DATA_FILE_PREFIX + "_", Const.EMPTY_STRING);
fileName = fileName.replace("." + Const.FILE_SUFFIX, Const.EMPTY_STRING);
return Long.valueOf(fileName);
}
private void readEarliestCreateDataFile() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
if (CollectionUtils.isNotEmpty(dataFiles)) {
if (dataFiles[0].getName().equals(readFileName)) {
return;
}
}
for (File dataFile : dataFiles) {
logger.debug("Reading segment buffer data file, file name: {}", dataFile.getAbsolutePath());
OffsetManager.INSTANCE.setReadOffset(dataFile.getName(), 0);
if (!read(dataFile, 0)) {
break;
}
}
}
private boolean read(File readFile, long readFileOffset) {
try {
inputStream = new FileInputStream(readFile);
inputStream.skip(readFileOffset);
String writeFileName = OffsetManager.INSTANCE.getWriteFileName();
long endPoint = readFile.length();
if (writeFileName.equals(readFile.getName())) {
endPoint = OffsetManager.INSTANCE.getWriteFileOffset();
}
while (readFile.length() > readFileOffset && readFileOffset < endPoint) {
UpstreamSegment upstreamSegment = UpstreamSegment.parser().parseDelimitedFrom(inputStream);
SegmentParse parse = new SegmentParse();
if (!parse.parse(upstreamSegment, SegmentParse.Source.Buffer)) {
return false;
}
final int serialized = upstreamSegment.getSerializedSize();
readFileOffset = readFileOffset + CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
logger.debug("read segment buffer from file: {}, offset: {}, file length: {}", readFile.getName(), readFileOffset, readFile.length());
OffsetManager.INSTANCE.setReadOffset(readFileOffset);
}
inputStream.close();
if (!writeFileName.equals(readFile.getName())) {
readFile.delete();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
return false;
}
return true;
}
class PrefixFileNameFilter implements FilenameFilter {
@Override public boolean accept(File dir, String name) {
if (name.startsWith(SegmentBufferManager.DATA_FILE_PREFIX)) {
return true;
} else {
return false;
}
}
}
}
......@@ -24,6 +24,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.cache.ServiceNameCache;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -32,14 +33,13 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener,FirstSpanListener {
public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener, FirstSpanListener {
private final Logger logger = LoggerFactory.getLogger(SegmentCostSpanListener.class);
......@@ -48,39 +48,42 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
private long timeBucket;
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
SegmentCostDataDefine.SegmentCost segmentCost = new SegmentCostDataDefine.SegmentCost();
segmentCost.setSegmentId(segmentId);
segmentCost.setApplicationId(applicationId);
segmentCost.setCost(spanObject.getEndTime() - spanObject.getStartTime());
segmentCost.setStartTime(spanObject.getStartTime());
segmentCost.setEndTime(spanObject.getEndTime());
segmentCost.setCost(spanDecorator.getEndTime() - spanDecorator.getStartTime());
segmentCost.setStartTime(spanDecorator.getStartTime());
segmentCost.setEndTime(spanDecorator.getEndTime());
segmentCost.setId(segmentId);
if (spanObject.getOperationNameId() == 0) {
segmentCost.setServiceName(spanObject.getOperationName());
if (spanDecorator.getOperationNameId() == 0) {
segmentCost.setServiceName(spanDecorator.getOperationName());
} else {
segmentCost.setServiceName(ServiceNameCache.get(spanObject.getOperationNameId()));
segmentCost.setServiceName(ServiceNameCache.get(spanDecorator.getOperationNameId()));
}
segmentCosts.add(segmentCost);
isError = isError || spanObject.getIsError();
isError = isError || spanDecorator.getIsError();
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
isError = isError || spanObject.getIsError();
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
isError = isError || spanDecorator.getIsError();
}
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
isError = isError || spanObject.getIsError();
public void parseExit(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, String segmentId) {
isError = isError || spanDecorator.getIsError();
}
@Override
public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
isError = isError || spanObject.getIsError();
public void parseLocal(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
isError = isError || spanDecorator.getIsError();
}
@Override public void build() {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.standardization;
/**
* @author pengys5
*/
public interface IdExchanger<T extends StandardBuilder> {
boolean exchange(T standardBuilder, int applicationId);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.standardization;
import org.skywalking.apm.network.proto.RefType;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author pengys5
*/
public class ReferenceDecorator implements StandardBuilder {
private boolean isOrigin = true;
private StandardBuilder standardBuilder;
private TraceSegmentReference referenceObject;
private TraceSegmentReference.Builder referenceBuilder;
public ReferenceDecorator(TraceSegmentReference referenceObject, StandardBuilder standardBuilder) {
this.referenceObject = referenceObject;
this.standardBuilder = standardBuilder;
}
public ReferenceDecorator(TraceSegmentReference.Builder referenceBuilder, StandardBuilder standardBuilder) {
this.referenceBuilder = referenceBuilder;
this.standardBuilder = standardBuilder;
this.isOrigin = false;
}
public RefType getRefType() {
if (isOrigin) {
return referenceObject.getRefType();
} else {
return referenceBuilder.getRefType();
}
}
public int getRefTypeValue() {
if (isOrigin) {
return referenceObject.getRefTypeValue();
} else {
return referenceBuilder.getRefTypeValue();
}
}
public int getEntryServiceId() {
if (isOrigin) {
return referenceObject.getEntryServiceId();
} else {
return referenceBuilder.getEntryServiceId();
}
}
public void setEntryServiceId(int value) {
if (isOrigin) {
toBuilder();
} else {
referenceBuilder.setEntryServiceId(value);
}
}
public String getEntryServiceName() {
if (isOrigin) {
return referenceObject.getEntryServiceName();
} else {
return referenceBuilder.getEntryServiceName();
}
}
public void setEntryServiceName(String value) {
if (isOrigin) {
toBuilder();
} else {
referenceBuilder.setEntryServiceName(value);
}
}
public int getEntryApplicationInstanceId() {
if (isOrigin) {
return referenceObject.getEntryApplicationInstanceId();
} else {
return referenceBuilder.getEntryApplicationInstanceId();
}
}
public int getParentApplicationInstanceId() {
if (isOrigin) {
return referenceObject.getParentApplicationInstanceId();
} else {
return referenceBuilder.getParentApplicationInstanceId();
}
}
public int getParentServiceId() {
if (isOrigin) {
return referenceObject.getParentServiceId();
} else {
return referenceBuilder.getParentServiceId();
}
}
public void setParentServiceId(int value) {
if (isOrigin) {
toBuilder();
} else {
referenceBuilder.setParentServiceId(value);
}
}
public int getParentSpanId() {
if (isOrigin) {
return referenceObject.getParentSpanId();
} else {
return referenceBuilder.getParentSpanId();
}
}
public String getParentServiceName() {
if (isOrigin) {
return referenceObject.getParentServiceName();
} else {
return referenceBuilder.getParentServiceName();
}
}
public void setParentServiceName(String value) {
if (isOrigin) {
toBuilder();
} else {
referenceBuilder.setParentServiceName(value);
}
}
public UniqueId getParentTraceSegmentId() {
if (isOrigin) {
return referenceObject.getParentTraceSegmentId();
} else {
return referenceBuilder.getParentTraceSegmentId();
}
}
public int getNetworkAddressId() {
if (isOrigin) {
return referenceObject.getNetworkAddressId();
} else {
return referenceBuilder.getNetworkAddressId();
}
}
public void setNetworkAddressId(int value) {
if (isOrigin) {
toBuilder();
} else {
referenceBuilder.setNetworkAddressId(value);
}
}
public String getNetworkAddress() {
if (isOrigin) {
return referenceObject.getNetworkAddress();
} else {
return referenceBuilder.getNetworkAddress();
}
}
public void setNetworkAddress(String value) {
if (isOrigin) {
toBuilder();
} else {
referenceBuilder.setNetworkAddress(value);
}
}
@Override public void toBuilder() {
this.isOrigin = false;
referenceBuilder = referenceObject.toBuilder();
standardBuilder.toBuilder();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.standardization;
import org.skywalking.apm.collector.cache.ApplicationCache;
import org.skywalking.apm.collector.cache.InstanceCache;
import org.skywalking.apm.collector.cache.ServiceIdCache;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
private final Logger logger = LoggerFactory.getLogger(ReferenceIdExchanger.class);
private static ReferenceIdExchanger exchanger;
public static ReferenceIdExchanger getInstance() {
if (exchanger == null) {
exchanger = new ReferenceIdExchanger();
}
return exchanger;
}
@Override public boolean exchange(ReferenceDecorator standardBuilder, int applicationId) {
if (standardBuilder.getEntryServiceId() == 0 && StringUtils.isNotEmpty(standardBuilder.getEntryServiceName())) {
int entryServiceId = ServiceIdCache.get(InstanceCache.get(standardBuilder.getEntryApplicationInstanceId()), standardBuilder.getEntryServiceName());
if (entryServiceId == 0) {
return false;
} else {
standardBuilder.toBuilder();
standardBuilder.setEntryServiceId(entryServiceId);
standardBuilder.setEntryServiceName(Const.EMPTY_STRING);
}
}
if (standardBuilder.getParentServiceId() == 0 && StringUtils.isNotEmpty(standardBuilder.getParentServiceName())) {
int parentServiceId = ServiceIdCache.get(InstanceCache.get(standardBuilder.getParentApplicationInstanceId()), standardBuilder.getParentServiceName());
if (parentServiceId == 0) {
return false;
} else {
standardBuilder.toBuilder();
standardBuilder.setParentServiceId(parentServiceId);
standardBuilder.setParentServiceName(Const.EMPTY_STRING);
}
}
if (standardBuilder.getNetworkAddressId() == 0 && StringUtils.isNotEmpty(standardBuilder.getNetworkAddress())) {
int networkAddressId = ApplicationCache.get(standardBuilder.getNetworkAddress());
if (networkAddressId == 0) {
return false;
} else {
standardBuilder.toBuilder();
standardBuilder.setNetworkAddressId(networkAddressId);
standardBuilder.setNetworkAddress(Const.EMPTY_STRING);
}
}
return true;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.standardization;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author pengys5
*/
public class SegmentDecorator implements StandardBuilder {
private boolean isOrigin = true;
private final TraceSegmentObject segmentObject;
private TraceSegmentObject.Builder segmentBuilder;
public SegmentDecorator(TraceSegmentObject segmentObject) {
this.segmentObject = segmentObject;
}
public int getApplicationId() {
return segmentObject.getApplicationId();
}
public int getApplicationInstanceId() {
return segmentObject.getApplicationInstanceId();
}
public UniqueId getTraceSegmentId() {
return segmentObject.getTraceSegmentId();
}
public int getSpansCount() {
return segmentObject.getSpansCount();
}
public SpanDecorator getSpans(int index) {
if (isOrigin) {
return new SpanDecorator(segmentObject.getSpans(index), this);
} else {
return new SpanDecorator(segmentBuilder.getSpansBuilder(index), this);
}
}
public int getRefsCount() {
return segmentObject.getRefsCount();
}
public ReferenceDecorator getRefs(int index) {
if (isOrigin) {
return new ReferenceDecorator(segmentObject.getRefs(index), this);
} else {
return new ReferenceDecorator(segmentBuilder.getRefsBuilder(index), this);
}
}
public byte[] toByteArray() {
if (isOrigin) {
return segmentObject.toByteArray();
} else {
return segmentBuilder.build().toByteArray();
}
}
@Override public void toBuilder() {
if (!isOrigin) {
this.isOrigin = false;
this.segmentBuilder = segmentObject.toBuilder();
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.standardization;
import org.skywalking.apm.collector.agentstream.worker.segment.buffer.SegmentBufferManager;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
public SegmentStandardizationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
SegmentBufferManager.INSTANCE.initialize();
}
@Override protected void onWork(Object message) throws WorkerException {
if (message instanceof FlushAndSwitch) {
SegmentBufferManager.INSTANCE.flush();
} else if (message instanceof EndOfBatchCommand) {
} else if (message instanceof UpstreamSegment) {
UpstreamSegment upstreamSegment = (UpstreamSegment)message;
SegmentBufferManager.INSTANCE.writeBuffer(upstreamSegment);
} else {
logger.error("unhandled message, message instance must UpstreamSegment, but is %s", message.getClass().toString());
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentStandardizationWorker> {
@Override
public Role role() {
return SegmentStandardizationWorker.WorkerRole.INSTANCE;
}
@Override
public SegmentStandardizationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentStandardizationWorker(role(), clusterContext);
}
@Override public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return SegmentStandardizationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return null;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.standardization;
import org.skywalking.apm.network.proto.SpanLayer;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
/**
* @author pengys5
*/
public class SpanDecorator implements StandardBuilder {
private boolean isOrigin = true;
private StandardBuilder standardBuilder;
private SpanObject spanObject;
private SpanObject.Builder spanBuilder;
public SpanDecorator(SpanObject spanObject, StandardBuilder standardBuilder) {
this.spanObject = spanObject;
this.standardBuilder = standardBuilder;
}
public SpanDecorator(SpanObject.Builder spanBuilder, StandardBuilder standardBuilder) {
this.spanBuilder = spanBuilder;
this.standardBuilder = standardBuilder;
this.isOrigin = false;
}
public int getSpanId() {
if (isOrigin) {
return spanObject.getSpanId();
} else {
return spanBuilder.getSpanId();
}
}
public int getParentSpanId() {
if (isOrigin) {
return spanObject.getParentSpanId();
} else {
return spanBuilder.getParentSpanId();
}
}
public SpanType getSpanType() {
if (isOrigin) {
return spanObject.getSpanType();
} else {
return spanBuilder.getSpanType();
}
}
public int getSpanTypeValue() {
if (isOrigin) {
return spanObject.getSpanTypeValue();
} else {
return spanBuilder.getSpanTypeValue();
}
}
public SpanLayer getSpanLayer() {
if (isOrigin) {
return spanObject.getSpanLayer();
} else {
return spanBuilder.getSpanLayer();
}
}
public int getSpanLayerValue() {
if (isOrigin) {
return spanObject.getSpanLayerValue();
} else {
return spanBuilder.getSpanLayerValue();
}
}
public long getStartTime() {
if (isOrigin) {
return spanObject.getStartTime();
} else {
return spanBuilder.getStartTime();
}
}
public long getEndTime() {
if (isOrigin) {
return spanObject.getEndTime();
} else {
return spanBuilder.getEndTime();
}
}
public int getComponentId() {
if (isOrigin) {
return spanObject.getComponentId();
} else {
return spanBuilder.getComponentId();
}
}
public String getComponent() {
if (isOrigin) {
return spanObject.getComponent();
} else {
return spanBuilder.getComponent();
}
}
public int getPeerId() {
if (isOrigin) {
return spanObject.getPeerId();
} else {
return spanBuilder.getPeerId();
}
}
public void setPeerId(int peerId) {
if (isOrigin) {
toBuilder();
} else {
spanBuilder.setPeerId(peerId);
}
}
public String getPeer() {
if (isOrigin) {
return spanObject.getPeer();
} else {
return spanBuilder.getPeer();
}
}
public void setPeer(String peer) {
if (isOrigin) {
toBuilder();
} else {
spanBuilder.setPeer(peer);
}
}
public int getOperationNameId() {
if (isOrigin) {
return spanObject.getOperationNameId();
} else {
return spanBuilder.getOperationNameId();
}
}
public void setOperationNameId(int value) {
if (isOrigin) {
toBuilder();
} else {
spanBuilder.setOperationNameId(value);
}
}
public String getOperationName() {
if (isOrigin) {
return spanObject.getOperationName();
} else {
return spanBuilder.getOperationName();
}
}
public void setOperationName(String value) {
if (isOrigin) {
toBuilder();
} else {
spanBuilder.setOperationName(value);
}
}
public boolean getIsError() {
if (isOrigin) {
return spanObject.getIsError();
} else {
return spanBuilder.getIsError();
}
}
@Override public void toBuilder() {
this.isOrigin = false;
spanBuilder = spanObject.toBuilder();
standardBuilder.toBuilder();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.standardization;
import org.skywalking.apm.collector.cache.ApplicationCache;
import org.skywalking.apm.collector.cache.ServiceIdCache;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
private static SpanIdExchanger exchanger;
public static SpanIdExchanger getInstance() {
if (exchanger == null) {
exchanger = new SpanIdExchanger();
}
return exchanger;
}
@Override public boolean exchange(SpanDecorator standardBuilder, int applicationId) {
if (standardBuilder.getPeerId() == 0 && StringUtils.isNotEmpty(standardBuilder.getPeer())) {
int peerId = ApplicationCache.get(standardBuilder.getPeer());
if (peerId == 0) {
return false;
} else {
standardBuilder.toBuilder();
standardBuilder.setPeerId(peerId);
standardBuilder.setPeer(Const.EMPTY_STRING);
}
}
if (standardBuilder.getOperationNameId() == 0 && StringUtils.isNotEmpty(standardBuilder.getOperationName())) {
int operationNameId = ServiceIdCache.get(applicationId, standardBuilder.getOperationName());
if (operationNameId == 0) {
return false;
} else {
standardBuilder.toBuilder();
standardBuilder.setOperationNameId(operationNameId);
standardBuilder.setOperationName(Const.EMPTY_STRING);
}
}
return true;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.standardization;
/**
* @author pengys5
*/
public interface StandardBuilder {
void toBuilder();
}
......@@ -21,7 +21,8 @@ package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.cache.ServiceNameCache;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -30,8 +31,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,29 +45,25 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
private boolean hasReference = false;
private int applicationId;
private int entryServiceId;
private String entryServiceName;
private boolean hasEntry = false;
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
this.applicationId = applicationId;
this.entryServiceId = spanObject.getOperationNameId();
if (spanObject.getOperationNameId() == 0) {
this.entryServiceName = spanObject.getOperationName();
} else {
this.entryServiceName = ServiceNameCache.get(this.entryServiceId);
}
this.entryServiceId = spanDecorator.getOperationNameId();
this.hasEntry = true;
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
@Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
hasReference = true;
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
}
@Override public void build() {
......@@ -76,10 +71,9 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (!hasReference && hasEntry) {
ServiceEntryDataDefine.ServiceEntry serviceEntry = new ServiceEntryDataDefine.ServiceEntry();
serviceEntry.setId(applicationId + Const.ID_SPLIT + entryServiceName);
serviceEntry.setId(applicationId + Const.ID_SPLIT + entryServiceId);
serviceEntry.setApplicationId(applicationId);
serviceEntry.setEntryServiceId(entryServiceId);
serviceEntry.setEntryServiceName(entryServiceName);
serviceEntry.setRegisterTime(timeBucket);
serviceEntry.setNewestTime(timeBucket);
......
......@@ -23,7 +23,8 @@ import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -32,8 +33,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,9 +43,8 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceSpanListener.class);
private List<TraceSegmentReference> referenceServices = new LinkedList<>();
private List<ReferenceDecorator> referenceServices = new LinkedList<>();
private int serviceId = 0;
private String serviceName = "";
private long startTime = 0;
private long endTime = 0;
private boolean isError = false;
......@@ -54,26 +52,23 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa
private boolean hasEntry = false;
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
@Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
referenceServices.add(reference);
referenceServices.add(referenceDecorator);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
serviceId = spanObject.getOperationNameId();
if (spanObject.getOperationNameId() == 0) {
serviceName = String.valueOf(applicationId) + Const.ID_SPLIT + spanObject.getOperationName();
} else {
serviceName = Const.EMPTY_STRING;
}
startTime = spanObject.getStartTime();
endTime = spanObject.getEndTime();
isError = spanObject.getIsError();
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
serviceId = spanDecorator.getOperationNameId();
startTime = spanDecorator.getStartTime();
endTime = spanDecorator.getEndTime();
isError = spanDecorator.getIsError();
this.hasEntry = true;
}
......@@ -105,71 +100,39 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa
referenceServices.forEach(reference -> {
ServiceReferenceDataDefine.ServiceReference serviceReference = new ServiceReferenceDataDefine.ServiceReference();
int entryServiceId = reference.getEntryServiceId();
String entryServiceName = buildServiceName(reference.getEntryApplicationInstanceId(), reference.getEntryServiceId(), reference.getEntryServiceName());
int frontServiceId = reference.getParentServiceId();
String frontServiceName = buildServiceName(reference.getParentApplicationInstanceId(), reference.getParentServiceId(), reference.getParentServiceName());
int behindServiceId = serviceId;
String behindServiceName = serviceName;
calculateCost(serviceReference, startTime, endTime, isError);
logger.debug("has reference, entryServiceId: {}, entryServiceName: {}", entryServiceId, entryServiceName);
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
logger.debug("has reference, entryServiceId: {}", entryServiceId);
sendToAggregationWorker(context, serviceReference, entryServiceId, frontServiceId, behindServiceId);
});
} else {
ServiceReferenceDataDefine.ServiceReference serviceReference = new ServiceReferenceDataDefine.ServiceReference();
int entryServiceId = serviceId;
String entryServiceName = serviceName;
int frontServiceId = Const.NONE_SERVICE_ID;
String frontServiceName = Const.EMPTY_STRING;
int behindServiceId = serviceId;
String behindServiceName = serviceName;
calculateCost(serviceReference, startTime, endTime, isError);
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
sendToAggregationWorker(context, serviceReference, entryServiceId, frontServiceId, behindServiceId);
}
}
}
private void sendToAggregationWorker(StreamModuleContext context,
ServiceReferenceDataDefine.ServiceReference serviceReference, int entryServiceId, String entryServiceName,
int frontServiceId, String frontServiceName, int behindServiceId, String behindServiceName) {
ServiceReferenceDataDefine.ServiceReference serviceReference, int entryServiceId, int frontServiceId,
int behindServiceId) {
StringBuilder idBuilder = new StringBuilder();
idBuilder.append(timeBucket).append(Const.ID_SPLIT);
if (entryServiceId == 0) {
idBuilder.append(entryServiceName).append(Const.ID_SPLIT);
serviceReference.setEntryServiceId(0);
serviceReference.setEntryServiceName(entryServiceName);
} else {
idBuilder.append(entryServiceId).append(Const.ID_SPLIT);
serviceReference.setEntryServiceId(entryServiceId);
serviceReference.setEntryServiceName(Const.EMPTY_STRING);
}
if (frontServiceId == 0) {
idBuilder.append(frontServiceName).append(Const.ID_SPLIT);
serviceReference.setFrontServiceId(0);
serviceReference.setFrontServiceName(frontServiceName);
} else {
idBuilder.append(frontServiceId).append(Const.ID_SPLIT);
serviceReference.setFrontServiceId(frontServiceId);
serviceReference.setFrontServiceName(Const.EMPTY_STRING);
}
idBuilder.append(entryServiceId).append(Const.ID_SPLIT);
serviceReference.setEntryServiceId(entryServiceId);
if (behindServiceId == 0) {
idBuilder.append(behindServiceName);
serviceReference.setBehindServiceId(0);
serviceReference.setBehindServiceName(behindServiceName);
} else {
idBuilder.append(behindServiceId);
serviceReference.setBehindServiceId(behindServiceId);
serviceReference.setBehindServiceName(Const.EMPTY_STRING);
}
idBuilder.append(frontServiceId).append(Const.ID_SPLIT);
serviceReference.setFrontServiceId(frontServiceId);
idBuilder.append(behindServiceId);
serviceReference.setBehindServiceId(behindServiceId);
serviceReference.setId(idBuilder.toString());
serviceReference.setTimeBucket(timeBucket);
......@@ -180,13 +143,4 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa
logger.error(e.getMessage(), e);
}
}
private String buildServiceName(int instanceId, int serviceId, String serviceName) {
if (serviceId == 0) {
int applicationId = InstanceCache.get(instanceId);
return String.valueOf(applicationId) + Const.ID_SPLIT + serviceName;
} else {
return Const.EMPTY_STRING;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.skywalking.apm.collector.core.util.Const;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public enum FileUtils {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(FileUtils.class);
public String readLastLine(File file) {
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(file, "r");
long length = randomAccessFile.length();
if (length == 0) {
return Const.EMPTY_STRING;
} else {
long position = length - 1;
randomAccessFile.seek(position);
while (position >= 0) {
if (randomAccessFile.read() == '\n') {
return randomAccessFile.readLine();
}
randomAccessFile.seek(position);
if (position == 0) {
return randomAccessFile.readLine();
}
position--;
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (randomAccessFile != null) {
try {
randomAccessFile.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return Const.EMPTY_STRING;
}
public void writeAppendToLast(File file, RandomAccessFile randomAccessFile, String value) {
if (randomAccessFile == null) {
try {
randomAccessFile = new RandomAccessFile(file, "rwd");
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
try {
long length = randomAccessFile.length();
randomAccessFile.seek(length);
randomAccessFile.writeBytes(System.lineSeparator());
randomAccessFile.writeBytes(value);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
......@@ -15,6 +15,8 @@ org.skywalking.apm.collector.agentstream.worker.serviceref.ServiceReferencePersi
org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.standardization.SegmentStandardizationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.instance.performance.InstPerformancePersistenceWorker$Factory
\ No newline at end of file
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.agentstream.grpc.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.Test;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
......@@ -45,6 +46,7 @@ public class TraceSegmentServiceHandlerTestCase {
private TraceSegmentServiceGrpc.TraceSegmentServiceStub stub;
@Test
public void testCollect() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = TraceSegmentServiceGrpc.newStub(channel);
......@@ -70,7 +72,7 @@ public class TraceSegmentServiceHandlerTestCase {
streamObserver.onCompleted();
try {
Thread.sleep(30000);
Thread.sleep(10000);
} catch (InterruptedException e) {
}
}
......
......@@ -60,6 +60,8 @@ public class SegmentPost {
applicationEsDAO.save(consumerApplication);
ApplicationDataDefine.Application providerApplication = new ApplicationDataDefine.Application("3", "dubbox-provider", 3);
applicationEsDAO.save(providerApplication);
// ApplicationDataDefine.Application peer = new ApplicationDataDefine.Application("4", "172.25.0.4:20880", 4);
// applicationEsDAO.save(peer);
ServiceNameEsDAO serviceNameEsDAO = new ServiceNameEsDAO();
serviceNameEsDAO.setClient(client);
......@@ -84,6 +86,7 @@ public class SegmentPost {
DIFF = 0;
Thread.sleep(1000);
break;
}
}
......
......@@ -16,17 +16,16 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.util;
package org.skywalking.apm.collector.agentstream.worker.segment.buffer;
import org.junit.Test;
/**
* @author pengys5
*/
public enum ExchangeMarkUtils {
INSTANCE;
private static final String MARK_TAG = "M";
public class OffsetManagerTestCase {
public String buildMarkedID(int id) {
return MARK_TAG + id;
@Test
public void test() {
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream.worker.segment.buffer;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SegmentStandardizationWorker;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.network.proto.SpanLayer;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* @author pengys5
*/
public class SegmentBufferWriteWorkerTestCase {
public static void main(String[] args) throws WorkerException, ProviderNotFoundException {
SegmentBufferConfig.BUFFER_PATH = "/Users/pengys5/code/sky-walking/sky-walking/apm-collector/buffer/";
SegmentStandardizationWorker worker = new SegmentStandardizationWorker(null, null);
worker.preStart();
worker.allocateJob(buildSegment());
worker.allocateJob(buildSegment());
worker.allocateJob(buildSegment());
worker.allocateJob(buildSegment());
worker.allocateJob(buildSegment());
}
private static UpstreamSegment buildSegment() {
long now = System.currentTimeMillis();
int id = 1;
UniqueId.Builder builder = UniqueId.newBuilder();
builder.addIdParts(id);
builder.addIdParts(id);
builder.addIdParts(id);
UniqueId segmentId = builder.build();
UpstreamSegment.Builder upstream = UpstreamSegment.newBuilder();
upstream.addGlobalTraceIds(segmentId);
TraceSegmentObject.Builder segmentBuilder = TraceSegmentObject.newBuilder();
segmentBuilder.setApplicationId(1);
segmentBuilder.setApplicationInstanceId(1);
segmentBuilder.setTraceSegmentId(segmentId);
SpanObject.Builder entrySpan = SpanObject.newBuilder();
entrySpan.setSpanId(0);
entrySpan.setSpanType(SpanType.Entry);
entrySpan.setSpanLayer(SpanLayer.Http);
entrySpan.setParentSpanId(-1);
entrySpan.setStartTime(now);
entrySpan.setEndTime(now + 3000);
entrySpan.setComponentId(ComponentsDefine.TOMCAT.getId());
entrySpan.setOperationNameId(1);
entrySpan.setIsError(false);
segmentBuilder.addSpans(entrySpan);
SpanObject.Builder exitSpan = SpanObject.newBuilder();
exitSpan.setSpanId(1);
exitSpan.setSpanType(SpanType.Exit);
exitSpan.setSpanLayer(SpanLayer.Database);
exitSpan.setParentSpanId(0);
exitSpan.setStartTime(now);
exitSpan.setEndTime(now + 3000);
exitSpan.setComponentId(ComponentsDefine.MONGODB.getId());
exitSpan.setOperationNameId(2);
exitSpan.setIsError(false);
exitSpan.setPeer("localhost:8888");
segmentBuilder.addSpans(exitSpan);
upstream.setSegment(segmentBuilder.build().toByteString());
return upstream.build();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="INFO"/>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -19,6 +19,7 @@
package org.skywalking.apm.collector.boot;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.config.SystemConfigParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -31,6 +32,7 @@ public class CollectorBootStartUp {
public static void main(String[] args) throws CollectorException {
logger.info("collector starting...");
SystemConfigParser.INSTANCE.parse();
CollectorStarter starter = new CollectorStarter();
starter.start();
logger.info("collector start successful.");
......
......@@ -53,6 +53,9 @@ public class CollectorStarter implements Starter {
ServerHolder serverHolder = new ServerHolder();
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
if (moduleGroupDefine.groupConfigParser() != null) {
moduleGroupDefine.groupConfigParser().parse(configuration.get(moduleGroupDefine.name()));
}
moduleGroupDefine.moduleInstaller().injectConfiguration(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name()));
moduleGroupDefine.moduleInstaller().injectServerHolder(serverHolder);
moduleGroupDefine.moduleInstaller().preInstall();
......
......@@ -15,6 +15,9 @@ agent_stream:
host: localhost
port: 12800
context_path: /
config:
buffer_offset_max_file_size: 1k
buffer_segment_max_file_size: 10k
ui:
jetty:
host: localhost
......
......@@ -17,7 +17,7 @@
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<Configuration status="error">
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
......@@ -27,7 +27,8 @@
<logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="INFO"/>
<Root level="info">
<logger name="io.grpc.netty.NettyServerHandler" level="INFO"/>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
......@@ -22,28 +22,31 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.dao.IApplicationDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationCache {
private static final Logger logger = LoggerFactory.getLogger(ApplicationCache.class);
private static Cache<String, Integer> CODE_CACHE = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
public static int get(String applicationCode) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
int applicationId = 0;
try {
applicationId = CODE_CACHE.get(applicationCode, () -> {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
return dao.getApplicationId(applicationCode);
});
applicationId = CODE_CACHE.get(applicationCode, () -> dao.getApplicationId(applicationCode));
} catch (Throwable e) {
return applicationId;
logger.error(e.getMessage(), e);
}
if (applicationId == 0) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
applicationId = dao.getApplicationId(applicationCode);
if (applicationId != 0) {
CODE_CACHE.put(applicationCode, applicationId);
......@@ -55,22 +58,20 @@ public class ApplicationCache {
private static Cache<Integer, String> ID_CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
public static String get(int applicationId) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
String applicationCode = Const.EMPTY_STRING;
try {
return ID_CACHE.get(applicationId, () -> {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
return dao.getApplicationCode(applicationId);
});
applicationCode = ID_CACHE.get(applicationId, () -> dao.getApplicationCode(applicationId));
} catch (Throwable e) {
return Const.EXCEPTION;
logger.error(e.getMessage(), e);
}
}
public static String getForUI(int applicationId) {
String applicationCode = get(applicationId);
if (applicationCode.equals("Unknown")) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
if (StringUtils.isEmpty(applicationCode)) {
applicationCode = dao.getApplicationCode(applicationId);
ID_CACHE.put(applicationId, applicationCode);
if (StringUtils.isNotEmpty(applicationCode)) {
CODE_CACHE.put(applicationCode, applicationId);
}
}
return applicationCode;
}
......
......@@ -22,22 +22,34 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceCache {
private static Cache<Integer, Integer> CACHE = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private static final Logger logger = LoggerFactory.getLogger(InstanceCache.class);
private static Cache<Integer, Integer> INSTANCE_CACHE = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
public static int get(int applicationInstanceId) {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int applicationId = 0;
try {
return CACHE.get(applicationInstanceId, () -> {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
return dao.getApplicationId(applicationInstanceId);
});
applicationId = INSTANCE_CACHE.get(applicationInstanceId, () -> dao.getApplicationId(applicationInstanceId));
} catch (Throwable e) {
return 0;
logger.error(e.getMessage(), e);
}
if (applicationId == 0) {
applicationId = dao.getApplicationId(applicationInstanceId);
if (applicationId != 0) {
INSTANCE_CACHE.put(applicationInstanceId, applicationId);
}
}
return applicationId;
}
}
......@@ -23,32 +23,34 @@ import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceIdCache {
private static final Logger logger = LoggerFactory.getLogger(ServiceIdCache.class);
//TODO size configuration
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
private static Cache<String, Integer> SERVICE_CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
public static int get(int applicationId, String serviceName) {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
int serviceId = 0;
try {
return CACHE.get(applicationId + Const.ID_SPLIT + serviceName, () -> {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
return dao.getServiceId(applicationId, serviceName);
});
serviceId = SERVICE_CACHE.get(applicationId + Const.ID_SPLIT + serviceName, () -> dao.getServiceId(applicationId, serviceName));
} catch (Throwable e) {
return 0;
logger.error(e.getMessage(), e);
}
}
public static int getForUI(int applicationId, String serviceName) {
int serviceId = get(applicationId, serviceName);
if (serviceId == 0) {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
serviceId = dao.getServiceId(applicationId, serviceName);
CACHE.put(applicationId + Const.ID_SPLIT + serviceName, serviceId);
if (serviceId != 0) {
SERVICE_CACHE.put(applicationId + Const.ID_SPLIT + serviceName, serviceId);
}
}
return serviceId;
}
......
......@@ -22,34 +22,38 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameCache {
private static final Logger logger = LoggerFactory.getLogger(ServiceNameCache.class);
//TODO size configuration
private static Cache<Integer, String> CACHE = CacheBuilder.newBuilder().maximumSize(10000).build();
public static String get(int serviceId) {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
String serviceName = Const.EMPTY_STRING;
try {
return CACHE.get(serviceId, () -> {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
return dao.getServiceName(serviceId);
});
serviceName = CACHE.get(serviceId, () -> dao.getServiceName(serviceId));
} catch (Throwable e) {
return Const.EXCEPTION;
logger.error(e.getMessage(), e);
}
}
public static String getForUI(int serviceId) {
String serviceName = get(serviceId);
if (serviceName.equals("Unknown")) {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
if (StringUtils.isEmpty(serviceName)) {
serviceName = dao.getServiceName(serviceId);
CACHE.put(serviceId, serviceName);
if (StringUtils.isNotEmpty(serviceName)) {
CACHE.put(serviceId, serviceName);
}
}
return serviceName;
}
}
......@@ -19,6 +19,7 @@
package org.skywalking.apm.collector.cluster;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -46,4 +47,8 @@ public class ClusterModuleGroupDefine implements ModuleGroupDefine {
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
@Override public GroupConfigParser groupConfigParser() {
return null;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.config;
import java.util.Map;
/**
* @author pengys5
*/
public interface GroupConfigParser {
String NODE_NAME = "config";
void parse(Map<String, Map> config);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.config;
/**
* @author pengys5
*/
public class SystemConfig {
public static String DATA_PATH = "../data";
}
......@@ -16,36 +16,21 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agentstream;
package org.skywalking.apm.collector.core.config;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentStreamCommonModuleInstaller extends MultipleCommonModuleInstaller {
public enum SystemConfigParser {
INSTANCE;
@Override public String groupName() {
return AgentStreamModuleGroupDefine.GROUP_NAME;
}
@Override public Context moduleContext() {
return new AgentStreamModuleContext(groupName());
}
@Override public List<String> dependenceModules() {
return null;
}
private static final String DATA_PATH = "data.path";
@Override public void install() throws DefineException, ConfigException, ServerException, ClientException {
super.install();
new PersistenceTimer().start();
public void parse() {
if (!StringUtils.isEmpty(System.getProperty(DATA_PATH))) {
SystemConfig.DATA_PATH = System.getProperty(DATA_PATH);
}
}
}
......@@ -36,11 +36,11 @@ public abstract class CommonModuleInstaller implements ModuleInstaller {
this.moduleDefineMap = moduleDefineMap;
}
protected final Map<String, Map> getModuleConfig() {
final Map<String, Map> getModuleConfig() {
return moduleConfig;
}
protected final Map<String, ModuleDefine> getModuleDefineMap() {
final Map<String, ModuleDefine> getModuleDefineMap() {
return moduleDefineMap;
}
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
import org.skywalking.apm.collector.core.framework.Context;
/**
......@@ -29,4 +30,6 @@ public interface ModuleGroupDefine {
Context groupContext();
ModuleInstaller moduleInstaller();
GroupConfigParser groupConfigParser();
}
......@@ -37,11 +37,11 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class MultipleCommonModuleInstaller extends CommonModuleInstaller {
public abstract class MultipleModuleInstaller extends CommonModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(MultipleCommonModuleInstaller.class);
private final Logger logger = LoggerFactory.getLogger(MultipleModuleInstaller.class);
public MultipleCommonModuleInstaller() {
public MultipleModuleInstaller() {
moduleDefines = new LinkedList<>();
}
......
......@@ -31,4 +31,5 @@ public class Const {
public static final String UNKNOWN = "Unknown";
public static final String EXCEPTION = "Exception";
public static final String EMPTY_STRING = "";
public static final String FILE_SUFFIX = "sw";
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.queue;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -45,4 +46,8 @@ public class QueueModuleGroupDefine implements ModuleGroupDefine {
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
@Override public GroupConfigParser groupConfigParser() {
return null;
}
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.storage;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -45,4 +46,8 @@ public class StorageModuleGroupDefine implements ModuleGroupDefine {
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
@Override public GroupConfigParser groupConfigParser() {
return null;
}
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -45,4 +46,8 @@ public class StreamModuleGroupDefine implements ModuleGroupDefine {
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
@Override public GroupConfigParser groupConfigParser() {
return null;
}
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.ui;
import org.skywalking.apm.collector.core.config.GroupConfigParser;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
......@@ -28,10 +29,10 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
public class UIModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "ui";
private final UICommonModuleInstaller installer;
private final UIModuleInstaller installer;
public UIModuleGroupDefine() {
installer = new UICommonModuleInstaller();
installer = new UIModuleInstaller();
}
@Override public String name() {
......@@ -45,4 +46,8 @@ public class UIModuleGroupDefine implements ModuleGroupDefine {
@Override public ModuleInstaller moduleInstaller() {
return installer;
}
@Override public GroupConfigParser groupConfigParser() {
return null;
}
}
......@@ -20,12 +20,12 @@ package org.skywalking.apm.collector.ui;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleCommonModuleInstaller;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
/**
* @author pengys5
*/
public class UICommonModuleInstaller extends MultipleCommonModuleInstaller {
public class UIModuleInstaller extends MultipleModuleInstaller {
@Override public String groupName() {
return UIModuleGroupDefine.GROUP_NAME;
......
......@@ -112,7 +112,7 @@ public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
JsonObject application = new JsonObject();
application.addProperty("applicationId", applicationId);
application.addProperty("applicationCode", ApplicationCache.getForUI(applicationId));
application.addProperty("applicationCode", ApplicationCache.get(applicationId));
application.addProperty("instanceCount", instanceCount.getValue());
applications.add(application);
}
......
......@@ -106,7 +106,7 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO {
int peerId = peerIdBucket.getKeyAsNumber().intValue();
if (peerId != 0) {
String peer = ApplicationCache.getForUI(peerId);
String peer = ApplicationCache.get(peerId);
JsonObject nodeComponentObj = new JsonObject();
nodeComponentObj.addProperty("componentName", componentName);
......
......@@ -58,11 +58,11 @@ public class NodeMappingEsDAO extends EsDAO implements INodeMappingDAO {
JsonArray nodeMappingArray = new JsonArray();
for (Terms.Bucket applicationIdBucket : applicationIdTerms.getBuckets()) {
int applicationId = applicationIdBucket.getKeyAsNumber().intValue();
String applicationCode = ApplicationCache.getForUI(applicationId);
String applicationCode = ApplicationCache.get(applicationId);
Terms addressIdTerms = applicationIdBucket.getAggregations().get(NodeMappingTable.COLUMN_ADDRESS_ID);
for (Terms.Bucket addressIdBucket : addressIdTerms.getBuckets()) {
int addressId = addressIdBucket.getKeyAsNumber().intValue();
String address = ApplicationCache.getForUI(addressId);
String address = ApplicationCache.get(addressId);
if (addressId != 0) {
JsonObject nodeMappingObj = new JsonObject();
......
......@@ -72,13 +72,13 @@ public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO {
Terms frontApplicationIdTerms = searchResponse.getAggregations().get(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID);
for (Terms.Bucket frontApplicationIdBucket : frontApplicationIdTerms.getBuckets()) {
int applicationId = frontApplicationIdBucket.getKeyAsNumber().intValue();
String applicationCode = ApplicationCache.getForUI(applicationId);
String applicationCode = ApplicationCache.get(applicationId);
Terms behindApplicationIdTerms = frontApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID);
for (Terms.Bucket behindApplicationIdBucket : behindApplicationIdTerms.getBuckets()) {
int behindApplicationId = behindApplicationIdBucket.getKeyAsNumber().intValue();
if (behindApplicationId != 0) {
String behindApplicationCode = ApplicationCache.getForUI(behindApplicationId);
String behindApplicationCode = ApplicationCache.get(behindApplicationId);
Sum s1LTE = behindApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_S1_LTE);
Sum s3LTE = behindApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_S3_LTE);
......
......@@ -75,7 +75,7 @@ public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO {
for (SearchHit searchHit : searchHits.getHits()) {
int applicationId = ((Number)searchHit.getSource().get(ServiceEntryTable.COLUMN_APPLICATION_ID)).intValue();
int entryServiceId = ((Number)searchHit.getSource().get(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID)).intValue();
String applicationCode = ApplicationCache.getForUI(applicationId);
String applicationCode = ApplicationCache.get(applicationId);
String entryServiceName = (String)searchHit.getSource().get(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME);
JsonObject row = new JsonObject();
......
......@@ -146,7 +146,7 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
String frontServiceName = frontServiceBucket.getKeyAsString();
if (StringUtils.isNotEmpty(frontServiceName)) {
String[] serviceNames = frontServiceName.split(Const.ID_SPLIT);
int frontServiceId = ServiceIdCache.getForUI(Integer.parseInt(serviceNames[0]), serviceNames[1]);
int frontServiceId = ServiceIdCache.get(Integer.parseInt(serviceNames[0]), serviceNames[1]);
parseSubAggregate(serviceReferenceMap, frontServiceBucket, frontServiceId);
}
}
......@@ -179,11 +179,11 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
Sum summary = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_SUMMARY);
Sum costSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_COST_SUMMARY);
String frontServiceName = ServiceNameCache.getForUI(frontServiceId);
String frontServiceName = ServiceNameCache.get(frontServiceId);
if (StringUtils.isNotEmpty(frontServiceName)) {
frontServiceName = frontServiceName.split(Const.ID_SPLIT)[1];
}
String behindServiceName = ServiceNameCache.getForUI(behindServiceId);
String behindServiceName = ServiceNameCache.get(behindServiceId);
if (StringUtils.isNotEmpty(frontServiceName)) {
behindServiceName = behindServiceName.split(Const.ID_SPLIT)[1];
}
......@@ -216,9 +216,9 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
Sum summary = behindServiceNameBucket.getAggregations().get(ServiceReferenceTable.COLUMN_SUMMARY);
Sum costSum = behindServiceNameBucket.getAggregations().get(ServiceReferenceTable.COLUMN_COST_SUMMARY);
String frontServiceName = ServiceNameCache.getForUI(frontServiceId);
String frontServiceName = ServiceNameCache.get(frontServiceId);
String[] serviceNames = behindServiceName.split(Const.ID_SPLIT);
int behindServiceId = ServiceIdCache.getForUI(Integer.parseInt(serviceNames[0]), serviceNames[1]);
int behindServiceId = ServiceIdCache.get(Integer.parseInt(serviceNames[0]), serviceNames[1]);
behindServiceName = serviceNames[1];
JsonObject serviceReference = new JsonObject();
......
......@@ -50,7 +50,7 @@ public class InstanceHealthService {
response.add("instances", instances);
instanceList.forEach(instance -> {
response.addProperty("applicationCode", ApplicationCache.getForUI(applicationId));
response.addProperty("applicationCode", ApplicationCache.get(applicationId));
response.addProperty("applicationId", applicationId);
IInstPerformanceDAO instPerformanceDAO = (IInstPerformanceDAO)DAOContainer.INSTANCE.get(IInstPerformanceDAO.class.getName());
......
......@@ -96,7 +96,7 @@ public class SpanService {
if (spanObject.getPeerId() == 0) {
peerJson.addProperty("value", spanObject.getPeer());
} else {
peerJson.addProperty("value", ApplicationCache.getForUI(spanObject.getPeerId()));
peerJson.addProperty("value", ApplicationCache.get(spanObject.getPeerId()));
}
tagsArray.add(peerJson);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册