提交 d80e2ff4 编写于 作者: P pengys5

node component save to es success.

上级 5ad301ea
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
......@@ -32,5 +33,7 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
new PersistenceTimer().start();
}
}
......@@ -20,11 +20,11 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
private SegmentParse segmentParse = new SegmentParse();
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
SegmentParse segmentParse = new SegmentParse();
try {
List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getSegment());
......@@ -39,6 +39,7 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
}
@Override public void onCompleted() {
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
};
......
......@@ -4,6 +4,7 @@ package org.skywalking.apm.collector.agentstream.worker;
* @author pengys5
*/
public class CommonTable {
public static final String TABLE_TYPE = "type";
public static final String COLUMN_ID = "id";
public static final String COLUMN_AGG = "agg";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
......
package org.skywalking.apm.collector.agentstream.worker;
/**
* @author pengys5
*/
public abstract class TimeSlice {
private String sliceType;
private long startTime;
private long endTime;
public TimeSlice(String sliceType, long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
this.sliceType = sliceType;
}
public String getSliceType() {
return sliceType;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
}
......@@ -4,17 +4,20 @@ import org.skywalking.apm.collector.agentstream.worker.node.component.define.Nod
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeComponentAggWorker extends AggregationWorker {
public class NodeComponentAggregationWorker extends AggregationWorker {
public NodeComponentAggWorker(Role role, ClusterWorkerContext clusterContext) {
public NodeComponentAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -22,19 +25,19 @@ public class NodeComponentAggWorker extends AggregationWorker {
super.preStart();
}
@Override protected void sendToNext() {
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(NodeComponentRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentAggWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentAggregationWorker> {
@Override
public Role role() {
return Role.INSTANCE;
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentAggWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentAggWorker(role(), clusterContext);
public NodeComponentAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentAggregationWorker(role(), clusterContext);
}
@Override
......@@ -43,17 +46,17 @@ public class NodeComponentAggWorker extends AggregationWorker {
}
}
public enum Role implements org.skywalking.apm.collector.stream.worker.Role {
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentAggWorker.class.getSimpleName();
return NodeComponentAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
......
package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeComponentPersistenceWorker extends PersistenceWorker {
public NodeComponentPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
return dao.prepareBatch(dataMap);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
......@@ -20,6 +25,36 @@ public class NodeComponentRemoteWorker extends AbstractRemoteWorker {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(NodeComponentPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeComponentRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
......@@ -8,6 +8,11 @@ import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -27,13 +32,13 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
if (spanObject.getPeerId() == 0) {
peers = String.valueOf(spanObject.getPeerId());
}
String agg = spanObject.getComponent() + Const.ID_SPLIT + peers;
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
}
@Override public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId) {
String peers = String.valueOf(applicationId);
String agg = spanObject.getComponent() + Const.ID_SPLIT + peers;
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
}
......@@ -42,11 +47,18 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
}
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (String agg : nodeComponents) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
nodeComponent.setAgg(agg);
nodeComponent.setTimeBucket(timeBucket);
try {
logger.debug("send to node component aggregation worker, id: {}", nodeComponent.getId());
context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeComponentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource();
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -3,7 +3,9 @@ package org.skywalking.apm.collector.agentstream.worker.node.component.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -34,7 +36,7 @@ public class NodeComponentDataDefine extends DataDefine {
return null;
}
public static class NodeComponent {
public static class NodeComponent implements TransformToData {
private String id;
private String agg;
private long timeBucket;
......@@ -48,6 +50,15 @@ public class NodeComponentDataDefine extends DataDefine {
public NodeComponent() {
}
@Override public Data transform() {
NodeComponentDataDefine define = new NodeComponentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
public String getId() {
return id;
}
......
......@@ -13,7 +13,7 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 0;
return 2;
}
@Override public int numberOfShards() {
......
......@@ -19,7 +19,7 @@ public class InstanceDataDefine extends DataDefine {
}
@Override protected int initialCapacity() {
return 3;
return 6;
}
@Override protected void attributeDefine() {
......@@ -28,7 +28,7 @@ public class InstanceDataDefine extends DataDefine {
addAttribute(2, new Attribute(InstanceTable.COLUMN_AGENTUUID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(InstanceTable.COLUMN_REGISTER_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(4, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......
......@@ -22,4 +22,8 @@ public class InstanceH2DAO extends H2DAO implements IInstanceDAO {
@Override public void save(InstanceDataDefine.Instance instance) {
}
@Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
}
}
......@@ -63,6 +63,13 @@ public class SegmentParse {
}
}
}
notifyListenerToBuild();
}
private void notifyListenerToBuild() {
spanListeners.forEach(listener -> listener.build());
refsListeners.forEach(listener -> listener.build());
}
private void notifyExitListener(SpanObject spanObject, int applicationId, int applicationInstanceId) {
......
package org.skywalking.apm.collector.agentstream.worker.storage;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class PersistenceTimer implements Starter {
private final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
public void start() {
logger.info("persistence timer start");
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3 * 1000;
Runnable runnable = () -> {
while (true) {
try {
extractDataAndSave();
Thread.sleep(timeInterval);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
};
Thread persistenceThread = new Thread(runnable);
persistenceThread.setName("timerPersistence");
persistenceThread.start();
}
private void extractDataAndSave() {
List<PersistenceWorker> workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers();
workers.forEach(worker -> {
try {
worker.allocateJob(new FlushAndSwitch());
List<?> batchCollection = worker.buildBatchCollection();
IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName());
dao.batchPersistence(batchCollection);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
});
}
}
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameEsDAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationH2DAO
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceH2DAO
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameH2DAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterSerialWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentRemoteWorker$Factory
\ No newline at end of file
package org.skywalking.apm.collector.agentstream.grpc.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.Test;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.SpanLayer;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class TraceSegmentServiceHandlerTestCase {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandlerTestCase.class);
private TraceSegmentServiceGrpc.TraceSegmentServiceStub stub;
@Test
public void testCollect() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = TraceSegmentServiceGrpc.newStub(channel);
StreamObserver<UpstreamSegment> streamObserver = stub.collect(new StreamObserver<Downstream>() {
@Override public void onNext(Downstream downstream) {
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
}
});
UpstreamSegment.Builder builder = UpstreamSegment.newBuilder();
buildGlobalTraceIds(builder);
buildSegment(builder);
streamObserver.onNext(builder.build());
streamObserver.onCompleted();
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
}
}
private void buildGlobalTraceIds(UpstreamSegment.Builder builder) {
UniqueId.Builder builder1 = UniqueId.newBuilder();
builder1.addIdParts(100);
builder1.addIdParts(100);
builder1.addIdParts(100);
builder.addGlobalTraceIds(builder1.build());
}
private void buildSegment(UpstreamSegment.Builder builder) {
long now = System.currentTimeMillis();
TraceSegmentObject.Builder segmentBuilder = TraceSegmentObject.newBuilder();
segmentBuilder.setApplicationId(1);
segmentBuilder.setApplicationInstanceId(1);
segmentBuilder.setTraceSegmentId(UniqueId.newBuilder().addIdParts(200).addIdParts(200).addIdParts(200).build());
SpanObject.Builder span_0 = SpanObject.newBuilder();
span_0.setSpanId(0);
span_0.setOperationName("/dubbox-case/case/dubbox-rest");
span_0.setOperationNameId(0);
span_0.setParentSpanId(-1);
span_0.setSpanLayer(SpanLayer.Http);
span_0.setStartTime(now);
span_0.setEndTime(now + 100000);
span_0.setComponentId(ComponentsDefine.TOMCAT.getId());
span_0.setIsError(false);
span_0.setSpanType(SpanType.Entry);
span_0.setPeerId(0);
span_0.setPeer("localhost:8080");
segmentBuilder.addSpans(span_0);
builder.setSegment(segmentBuilder.build().toByteString());
}
}
......@@ -52,10 +52,6 @@
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......
......@@ -8,6 +8,7 @@ import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
......@@ -112,6 +113,10 @@ public class ElasticSearchClient implements Client {
return client.prepareIndex(indexName, "type", id);
}
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
public void update(UpdateRequest updateRequest) {
try {
client.update(updateRequest).get();
......
......@@ -2,7 +2,7 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
<Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{70} - %msg%n</Pattern>
</layout>
</appender>
......
package org.skywalking.apm.collector.storage.dao;
import java.util.List;
/**
* @author pengys5
*/
public interface IBatchDAO {
void batchPersistence(List<?> batchCollection);
}
package org.skywalking.apm.collector.storage.elasticsearch.dao;
import java.util.List;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class BatchEsDAO extends EsDAO implements IBatchDAO {
private final Logger logger = LoggerFactory.getLogger(BatchEsDAO.class);
@Override public void batchPersistence(List<?> batchCollection) {
BulkRequestBuilder bulkRequest = getClient().prepareBulk();
logger.info("bulk data size: {}", batchCollection.size());
if (CollectionUtils.isNotEmpty(batchCollection)) {
for (int i = 0; i < batchCollection.size(); i++) {
IndexRequestBuilder builder = (IndexRequestBuilder)batchCollection.get(i);
bulkRequest.add(builder);
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error(bulkResponse.buildFailureMessage());
}
}
}
}
package org.skywalking.apm.collector.storage.h2.dao;
import java.util.List;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
/**
* @author pengys5
*/
public class BatchH2DAO extends H2DAO implements IBatchDAO {
@Override public void batchPersistence(List<?> batchCollection) {
}
}
org.skywalking.apm.collector.storage.elasticsearch.dao.BatchEsDAO
\ No newline at end of file
org.skywalking.apm.collector.storage.h2.dao.BatchH2DAO
\ No newline at end of file
......@@ -66,7 +66,6 @@ public class StreamModuleInstaller implements ModuleInstaller {
List<AbstractRemoteWorkerProvider> remoteProviders = remoteProviderLoader.load();
for (AbstractRemoteWorkerProvider provider : remoteProviders) {
provider.setClusterContext(clusterWorkerContext);
// provider.create();
clusterWorkerContext.putRole(provider.role());
clusterWorkerContext.putProvider(provider);
}
......
......@@ -6,6 +6,8 @@ import org.skywalking.apm.collector.core.queue.QueueEventHandler;
import org.skywalking.apm.collector.core.queue.QueueExecutor;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
/**
* @author pengys5
......@@ -18,6 +20,10 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
T localAsyncWorker = workerInstance(getClusterContext());
localAsyncWorker.preStart();
if (localAsyncWorker instanceof PersistenceWorker) {
PersistenceWorkerContainer.INSTANCE.addWorker((PersistenceWorker)localAsyncWorker);
}
QueueCreator queueCreator = ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(QueueModuleGroupDefine.GROUP_NAME)).getQueueCreator();
QueueEventHandler queueEventHandler = queueCreator.create(queueSize(), localAsyncWorker);
......
......@@ -5,12 +5,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class WorkerContext implements Context {
private final Logger logger = LoggerFactory.getLogger(WorkerContext.class);
private Map<String, RemoteWorkerRef> remoteWorkerRefs;
private Map<String, List<WorkerRef>> roleWorkers;
private Map<String, Role> roles;
......@@ -56,6 +60,7 @@ public abstract class WorkerContext implements Context {
}
@Override final public void put(WorkerRef workerRef) {
logger.debug("put worker reference into context, role name: {}", workerRef.getRole().roleName());
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<>());
}
......
......@@ -6,23 +6,29 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);
private DataCache dataCache;
private int messageNum;
public AggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
private int messageNum;
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
......@@ -41,14 +47,34 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
}
}
protected abstract void sendToNext();
protected abstract WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException;
private void sendToNext() throws WorkerException {
dataCache.switchPointer();
while (dataCache.getLast().isHolding()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new WorkerException(e.getMessage(), e);
}
}
dataCache.getLast().asMap().forEach((id, data) -> {
try {
nextWorkRef(id).tell(data);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
});
}
protected final void aggregate(Object message) {
Data data = (Data)message;
dataCache.hold();
if (dataCache.containsKey(data.id())) {
getClusterContext().getDataDefine(data.getDefineId()).mergeData(data, dataCache.get(data.id()));
} else {
dataCache.put(data.id(), data);
}
dataCache.release();
}
}
package org.skywalking.apm.collector.stream.worker.impl;
/**
* @author pengys5
*/
public class FlushAndSwitch {
}
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
private DataCache dataCache;
public PersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof EndOfBatchCommand || message instanceof FlushAndSwitch) {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
} else {
if (dataCache.currentCollectionSize() >= 1000) {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
}
aggregate(message);
}
}
public List<?> buildBatchCollection() throws WorkerException {
List<?> batchCollection;
try {
while (dataCache.getLast().isHolding()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
batchCollection = prepareBatch(dataCache.getLast().asMap());
} finally {
dataCache.releaseLast();
}
return batchCollection;
}
protected abstract List<?> prepareBatch(Map<String, Data> dataMap);
private void aggregate(Object message) {
dataCache.hold();
Data data = (Data)message;
if (dataCache.containsKey(data.id())) {
getClusterContext().getDataDefine(data.getDefineId()).mergeData(data, dataCache.get(data.id()));
} else {
if (dataCache.currentCollectionSize() < 1000) {
dataCache.put(data.id(), data);
}
}
dataCache.release();
}
}
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public enum PersistenceWorkerContainer {
INSTANCE;
private List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
public void addWorker(PersistenceWorker worker) {
persistenceWorkers.add(worker);
}
public List<PersistenceWorker> getPersistenceWorkers() {
return persistenceWorkers;
}
}
package org.skywalking.apm.collector.stream.worker.impl.data;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.selector.AbstractHashMessage;
/**
* @author pengys5
*/
public class Data {
public class Data extends AbstractHashMessage {
private int defineId;
private final int stringCapacity;
private final int longCapacity;
......@@ -16,7 +17,8 @@ public class Data {
private Float[] dataFloats;
private Integer[] dataIntegers;
public Data(int defineId, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity) {
public Data(String id, int defineId, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity) {
super(id);
this.defineId = defineId;
this.dataStrings = new String[stringCapacity];
this.dataLongs = new Long[longCapacity];
......
......@@ -23,6 +23,10 @@ public class DataCache extends Window {
lockedDataCollection = getCurrentAndHold();
}
public int currentCollectionSize() {
return getCurrentAndHold().size();
}
public void release() {
lockedDataCollection.release();
lockedDataCollection = null;
......
......@@ -13,13 +13,10 @@ public abstract class DataDefine {
private int integerCapacity;
public DataDefine() {
stringCapacity = 0;
longCapacity = 0;
floatCapacity = 0;
integerCapacity = 0;
initial();
}
public final void initial() {
private void initial() {
attributes = new Attribute[initialCapacity()];
attributeDefine();
for (Attribute attribute : attributes) {
......@@ -45,8 +42,8 @@ public abstract class DataDefine {
protected abstract void attributeDefine();
public final Data build() {
return new Data(defineId(), stringCapacity, longCapacity, floatCapacity, integerCapacity);
public final Data build(String id) {
return new Data(id, defineId(), stringCapacity, longCapacity, floatCapacity, integerCapacity);
}
public void mergeData(Data newData, Data oldData) {
......
......@@ -22,7 +22,6 @@ public class DataDefineLoader implements Loader<Map<Integer, DataDefine>> {
DefinitionLoader<DataDefine> definitionLoader = DefinitionLoader.load(DataDefine.class, definitionFile);
for (DataDefine dataDefine : definitionLoader) {
logger.info("loaded data definition class: {}", dataDefine.getClass().getName());
dataDefine.initial();
dataDefineMap.put(dataDefine.defineId(), dataDefine);
}
return dataDefineMap;
......
package org.skywalking.apm.collector.stream.worker.impl.data;
/**
* @author pengys5
*/
public interface TransformToData {
Data transform();
}
package org.skywalking.apm.collector.stream.worker.impl.data;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author pengys5
*/
public abstract class Window {
private AtomicInteger windowSwitch = new AtomicInteger(0);
private DataCollection pointer;
private DataCollection windowDataA;
......@@ -16,6 +20,15 @@ public abstract class Window {
pointer = windowDataA;
}
public boolean trySwitchPointer() {
if (windowSwitch.incrementAndGet() == 1) {
return true;
} else {
windowSwitch.addAndGet(-1);
return false;
}
}
public void switchPointer() {
if (pointer == windowDataA) {
pointer = windowDataB;
......@@ -41,4 +54,9 @@ public abstract class Window {
return windowDataA;
}
}
public void releaseLast() {
getLast().clear();
windowSwitch.addAndGet(-1);
}
}
......@@ -39,5 +39,15 @@
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册