提交 dcad78c4 编写于 作者: P peng-yongsheng

Analysis segment parser module finished.

上级 3afb0c4a
......@@ -20,7 +20,7 @@
package org.apache.skywalking.apm.collector.agent.stream;
import java.util.Properties;
import org.apache.skywalking.apm.collector.agent.stream.buffer.BufferFileConfig;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer.BufferFileConfig;
import org.apache.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.apache.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph;
/**
* @author peng-yongsheng
*/
public class GraphIdDefine {
public static final int SEGMENT_PERSISTENCE_GRAPH_ID = 100;
public static final int SEGMENT_STANDARDIZATION_GRAPH_ID = 101;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph;
/**
* @author peng-yongsheng
*/
public class WorkerIdDefine {
public static final int SEGMENT_PERSISTENCE_WORKER_ID = 100;
public static final int SEGMENT_STANDARDIZATION_WORKER_ID = 101;
}
......@@ -38,7 +38,7 @@
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>layer-register-define</artifactId>
<artifactId>register-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......@@ -46,5 +46,15 @@
<artifactId>collector-cache-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>analysis-worker-model</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -22,7 +22,10 @@ import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer.SegmentBufferReader;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentParserListenerManager;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SegmentStandardizationGraph;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service.SegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service.SegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.core.module.Module;
......@@ -52,7 +55,12 @@ public class AnalysisSegmentParserModuleProvider extends ModuleProvider {
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
SegmentPersistenceGraph segmentPersistenceGraph = new SegmentPersistenceGraph(getManager());
segmentPersistenceGraph.create();
SegmentStandardizationGraph segmentStandardizationGraph = new SegmentStandardizationGraph(getManager());
segmentStandardizationGraph.create();
SegmentBufferReader.INSTANCE.setSegmentParserListenerManager(listenerManager);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......
......@@ -16,8 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.agent.stream.buffer;
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer;
import java.io.File;
import java.io.FilenameFilter;
......@@ -27,7 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.agent.stream.util.FileUtils;
import org.apache.skywalking.apm.collector.core.util.FileUtils;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -16,8 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.agent.stream.buffer;
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer;
import com.google.protobuf.CodedOutputStream;
import java.io.File;
......@@ -27,7 +26,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentParse;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentParserListenerManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.Const;
......@@ -45,12 +46,17 @@ public enum SegmentBufferReader {
private final Logger logger = LoggerFactory.getLogger(SegmentBufferReader.class);
private InputStream inputStream;
private ModuleManager moduleManager;
private SegmentParserListenerManager listenerManager;
public void initialize(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS);
}
public void setSegmentParserListenerManager(SegmentParserListenerManager listenerManager) {
this.listenerManager = listenerManager;
}
private void preRead() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
if (StringUtils.isNotEmpty(readFileName)) {
......@@ -121,8 +127,8 @@ public enum SegmentBufferReader {
while (readFile.length() > readFileOffset && readFileOffset < endPoint) {
UpstreamSegment upstreamSegment = UpstreamSegment.parser().parseDelimitedFrom(inputStream);
SegmentParse parse = new SegmentParse(moduleManager);
if (!parse.parse(upstreamSegment, SegmentParse.Source.Buffer)) {
SegmentParse parse = new SegmentParse(moduleManager, listenerManager);
if (!parse.parse(upstreamSegment, ISegmentParseService.Source.Buffer)) {
return false;
}
......
......@@ -20,10 +20,10 @@ package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.par
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import javax.swing.text.Segment;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.GraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.EntrySpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.ExitSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.FirstSpanListener;
......@@ -39,6 +39,7 @@ import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
import org.apache.skywalking.apm.network.proto.UniqueId;
......@@ -159,7 +160,7 @@ public class SegmentParse {
Segment segment = new Segment(id);
segment.setDataBinary(dataBinary);
segment.setTimeBucket(timeBucket);
Graph<Segment> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_GRAPH_ID, Segment.class);
Graph<Segment> graph = GraphManager.INSTANCE.findGraph(GraphIdDefine.SEGMENT_PERSISTENCE_GRAPH_ID, Segment.class);
graph.start(segment);
}
......@@ -167,7 +168,7 @@ public class SegmentParse {
logger.debug("push to segment buffer write worker, id: {}", id);
SegmentStandardization standardization = new SegmentStandardization(id);
standardization.setUpstreamSegment(upstreamSegment);
Graph<SegmentStandardization> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class);
Graph<SegmentStandardization> graph = GraphManager.INSTANCE.findGraph(GraphIdDefine.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class);
graph.start(standardization);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.GraphIdDefine;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
/**
* @author peng-yongsheng
*/
public class SegmentPersistenceGraph {
private final ModuleManager moduleManager;
public SegmentPersistenceGraph(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
public void create() {
GraphManager.INSTANCE.createIfAbsent(GraphIdDefine.SEGMENT_PERSISTENCE_GRAPH_ID, Segment.class)
.addNode(new SegmentPersistenceWorker.Factory(moduleManager).create(null));
}
}
......@@ -16,17 +16,16 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser;
package org.apache.skywalking.apm.collector.agent.stream.worker.trace.segment;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.WorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker;
/**
* @author peng-yongsheng
......@@ -38,7 +37,7 @@ public class SegmentPersistenceWorker extends PersistenceWorker<Segment, Segment
}
@Override public int id() {
return 117;
return WorkerIdDefine.SEGMENT_PERSISTENCE_WORKER_ID;
}
@Override protected boolean needMergeDBData() {
......@@ -50,8 +49,8 @@ public class SegmentPersistenceWorker extends PersistenceWorker<Segment, Segment
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<Segment, Segment, SegmentPersistenceWorker> {
public Factory(ModuleManager moduleManager, QueueCreatorService<Segment> queueCreatorService) {
super(moduleManager, queueCreatorService);
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public SegmentPersistenceWorker workerInstance(ModuleManager moduleManager) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.GraphIdDefine;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class SegmentStandardizationGraph {
private final ModuleManager moduleManager;
public SegmentStandardizationGraph(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
public void create() {
GraphManager.INSTANCE.createIfAbsent(GraphIdDefine.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class)
.addNode(new SegmentStandardizationWorker.Factory(moduleManager).create(null));
}
}
......@@ -16,17 +16,16 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.WorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer.SegmentBufferManager;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
import org.apache.skywalking.apm.collector.agent.stream.buffer.SegmentBufferManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -43,7 +42,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Segme
}
@Override public int id() {
return 108;
return WorkerIdDefine.SEGMENT_STANDARDIZATION_WORKER_ID;
}
@Override protected void onWork(SegmentStandardization segmentStandardization) throws WorkerException {
......@@ -56,8 +55,8 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Segme
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentStandardization, SegmentStandardization, SegmentStandardizationWorker> {
public Factory(ModuleManager moduleManager, QueueCreatorService<SegmentStandardization> queueCreatorService) {
super(moduleManager, queueCreatorService);
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public SegmentStandardizationWorker workerInstance(ModuleManager moduleManager) {
......
......@@ -33,7 +33,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT, OUTPUT, WORKER_TYP
}
@Override
public final WorkerRef create(WorkerCreateListener workerCreateListener) {
public final WorkerRef<INPUT, OUTPUT> create(WorkerCreateListener workerCreateListener) {
WORKER_TYPE localAsyncWorker = workerInstance(getModuleManager());
workerCreateListener.addWorker(localAsyncWorker);
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.core.graph;
import java.util.HashMap;
......@@ -46,7 +45,7 @@ public enum GraphManager {
}
}
public Graph findGraph(int graphId) {
public <INPUT> Graph<INPUT> findGraph(int graphId, Class<INPUT> input) {
Graph graph = allGraphs.get(graphId);
if (graph == null) {
throw new GraphNotFoundException("Graph id=" + graphId + " not found in this GraphManager");
......
......@@ -17,13 +17,12 @@
*/
package org.apache.skywalking.apm.collector.agent.stream.util;
package org.apache.skywalking.apm.collector.core.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册