提交 eaa02545 编写于 作者: P peng-yongsheng

Test success with standalone and cluster mode.

上级 f3e85ef6
......@@ -40,8 +40,11 @@ public class SegmentPost {
serviceNameRegisterPost.send("json/servicename-register-provider.json");
JsonElement provider = JsonFileReader.INSTANCE.read("json/dubbox-provider.json");
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
JsonElement consumer = JsonFileReader.INSTANCE.read("json/dubbox-consumer.json");
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
for (int i = 0; i < 1000; i++) {
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
}
}
}
......@@ -29,6 +29,7 @@ import org.skywalking.apm.collector.agent.stream.service.register.IApplicationID
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.agent.stream.worker.AgentStreamRemoteDataRegister;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstanceHeartBeatService;
......@@ -42,6 +43,8 @@ import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.skywalking.apm.collector.storage.StorageModule;
/**
......@@ -75,6 +78,10 @@ public class AgentStreamModuleProvider extends ModuleProvider {
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
RemoteDataRegisterService remoteDataRegisterService = getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class);
AgentStreamRemoteDataRegister agentStreamRemoteDataRegister = new AgentStreamRemoteDataRegister(remoteDataRegisterService);
agentStreamRemoteDataRegister.register();
AgentStreamBootStartup bootStartup = new AgentStreamBootStartup(getManager());
bootStartup.start();
}
......
......@@ -26,13 +26,19 @@ import java.util.Properties;
public class BufferFileConfig {
static int BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
static int BUFFER_SEGMENT_MAX_FILE_SIZE = 10 * 1024 * 1024;
static String BUFFER_PATH = "../buffer/";
private static final String BUFFER_PATH_KEY = "buffer_file_path";
private static final String BUFFER_OFFSET_MAX_FILE_SIZE_KEY = "buffer_offset_max_file_size";
private static final String BUFFER_SEGMENT_MAX_FILE_SIZE_KEY = "buffer_segment_max_file_size";
public static class Parser {
public void parse(Properties config) {
if (config.containsKey(BUFFER_PATH_KEY)) {
BUFFER_PATH = config.getProperty(BUFFER_PATH_KEY);
}
if (config.containsKey(BUFFER_OFFSET_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_OFFSET_MAX_FILE_SIZE_KEY).toUpperCase();
if (sizeStr.endsWith("K")) {
......
......@@ -49,7 +49,7 @@ public enum OffsetManager {
public synchronized void initialize() throws IOException {
if (!initialized) {
this.offset = new Offset();
File dataPath = new File(SegmentBufferConfig.BUFFER_PATH);
File dataPath = new File(BufferFileConfig.BUFFER_PATH);
if (dataPath.mkdirs()) {
createOffsetFile();
} else {
......@@ -77,7 +77,7 @@ public enum OffsetManager {
private void createOffsetFile() throws IOException {
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
offsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
offsetFile = new File(BufferFileConfig.BUFFER_PATH + offsetFileName);
this.offset.getWriteOffset().setWriteFileName(Const.EMPTY_STRING);
this.offset.getWriteOffset().setWriteFileOffset(0);
this.offset.getReadOffset().setReadFileName(Const.EMPTY_STRING);
......@@ -99,7 +99,7 @@ public enum OffsetManager {
private void nextFile() {
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File newOffsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
File newOffsetFile = new File(BufferFileConfig.BUFFER_PATH + offsetFileName);
offsetFile.delete();
offsetFile = newOffsetFile;
this.flush();
......
......@@ -44,14 +44,14 @@ public enum SegmentBufferManager {
logger.info("segment buffer initialize");
try {
OffsetManager.INSTANCE.initialize();
if (new File(SegmentBufferConfig.BUFFER_PATH).mkdirs()) {
if (new File(BufferFileConfig.BUFFER_PATH).mkdirs()) {
newDataFile();
} else {
String writeFileName = OffsetManager.INSTANCE.getWriteFileName();
if (StringUtils.isNotEmpty(writeFileName)) {
File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
if (dataFile.exists()) {
outputStream = new FileOutputStream(new File(SegmentBufferConfig.BUFFER_PATH + writeFileName), true);
outputStream = new FileOutputStream(new File(BufferFileConfig.BUFFER_PATH + writeFileName), true);
} else {
newDataFile();
}
......@@ -83,7 +83,7 @@ public enum SegmentBufferManager {
logger.debug("create new segment buffer file");
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
dataFile.createNewFile();
OffsetManager.INSTANCE.setWriteOffset(writeFileName, 0);
try {
......@@ -98,6 +98,5 @@ public enum SegmentBufferManager {
}
public synchronized void flush() {
}
}
......@@ -53,7 +53,7 @@ public enum SegmentBufferReader {
private void preRead() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
if (StringUtils.isNotEmpty(readFileName)) {
File readFile = new File(SegmentBufferConfig.BUFFER_PATH + readFileName);
File readFile = new File(BufferFileConfig.BUFFER_PATH + readFileName);
if (readFile.exists()) {
deleteTheDataFilesBeforeReadFile(readFileName);
long readFileOffset = OffsetManager.INSTANCE.getReadFileOffset();
......@@ -69,7 +69,7 @@ public enum SegmentBufferReader {
}
private void deleteTheDataFilesBeforeReadFile(String readFileName) {
File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
File[] dataFiles = new File(BufferFileConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
long readFileCreateTime = getFileCreateTime(readFileName);
for (File dataFile : dataFiles) {
......@@ -90,7 +90,7 @@ public enum SegmentBufferReader {
private void readEarliestCreateDataFile() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
File[] dataFiles = new File(BufferFileConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
if (CollectionUtils.isNotEmpty(dataFiles)) {
if (dataFiles[0].getName().equals(readFileName)) {
......
......@@ -163,7 +163,7 @@ public class SegmentParse {
}
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
logger.debug("send to segment buffer write worker, id: {}", id);
logger.debug("push to segment buffer write worker, id: {}", id);
SegmentStandardization standardization = new SegmentStandardization(id);
standardization.setUpstreamSegment(upstreamSegment);
Graph<SegmentStandardization> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class);
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker;
import org.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
/**
* @author peng-yongsheng
*/
public class AgentStreamRemoteDataRegister {
private final RemoteDataRegisterService remoteDataRegisterService;
public AgentStreamRemoteDataRegister(RemoteDataRegisterService remoteDataRegisterService) {
this.remoteDataRegisterService = remoteDataRegisterService;
}
public void register() {
remoteDataRegisterService.register(Application.class, Application::new);
remoteDataRegisterService.register(Instance.class, Instance::new);
remoteDataRegisterService.register(ServiceName.class, ServiceName::new);
remoteDataRegisterService.register(NodeComponent.class, NodeComponent::new);
remoteDataRegisterService.register(NodeMapping.class, NodeMapping::new);
remoteDataRegisterService.register(NodeReference.class, NodeReference::new);
remoteDataRegisterService.register(ServiceEntry.class, ServiceEntry::new);
remoteDataRegisterService.register(ServiceReference.class, ServiceReference::new);
}
}
......@@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,10 +35,13 @@ public class CpuMetricService implements ICpuMetricService {
private final Logger logger = LoggerFactory.getLogger(CpuMetricService.class);
private final Graph<CpuMetric> cpuMetricGraph;
private Graph<CpuMetric> cpuMetricGraph;
public CpuMetricService() {
cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID, CpuMetric.class);
private Graph<CpuMetric> getCpuMetricGraph() {
if (ObjectUtils.isEmpty(cpuMetricGraph)) {
cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID, CpuMetric.class);
}
return cpuMetricGraph;
}
@Override public void send(int instanceId, long timeBucket, double usagePercent) {
......@@ -46,7 +50,7 @@ public class CpuMetricService implements ICpuMetricService {
cpuMetric.setUsagePercent(usagePercent);
cpuMetric.setTimeBucket(timeBucket);
logger.debug("send to cpu metric graph, id: {}", cpuMetric.getId());
cpuMetricGraph.start(cpuMetric);
logger.debug("push to cpu metric graph, id: {}", cpuMetric.getId());
getCpuMetricGraph().start(cpuMetric);
}
}
......@@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,10 +35,13 @@ public class GCMetricService implements IGCMetricService {
private final Logger logger = LoggerFactory.getLogger(GCMetricService.class);
private final Graph<GCMetric> gcMetricGraph;
private Graph<GCMetric> gcMetricGraph;
public GCMetricService() {
gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID, GCMetric.class);
private Graph<GCMetric> getGcMetricGraph() {
if (ObjectUtils.isEmpty(gcMetricGraph)) {
gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID, GCMetric.class);
}
return gcMetricGraph;
}
@Override public void send(int instanceId, long timeBucket, int phraseValue, long count, long time) {
......@@ -48,7 +52,7 @@ public class GCMetricService implements IGCMetricService {
gcMetric.setTime(time);
gcMetric.setTimeBucket(timeBucket);
logger.debug("send to gc metric graph, id: {}", gcMetric.getId());
gcMetricGraph.start(gcMetric);
logger.debug("push to gc metric graph, id: {}", gcMetric.getId());
getGcMetricGraph().start(gcMetric);
}
}
......@@ -22,6 +22,7 @@ import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.slf4j.Logger;
......@@ -34,10 +35,13 @@ public class InstanceHeartBeatService implements IInstanceHeartBeatService {
private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatService.class);
private final Graph<Instance> heartBeatGraph;
private Graph<Instance> heartBeatGraph;
public InstanceHeartBeatService() {
this.heartBeatGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.INST_HEART_BEAT_GRAPH_ID, Instance.class);
private Graph<Instance> getHeartBeatGraph() {
if (ObjectUtils.isEmpty(heartBeatGraph)) {
this.heartBeatGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.INST_HEART_BEAT_GRAPH_ID, Instance.class);
}
return heartBeatGraph;
}
@Override public void send(int instanceId, long heartBeatTime) {
......@@ -45,7 +49,7 @@ public class InstanceHeartBeatService implements IInstanceHeartBeatService {
instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
instance.setInstanceId(instanceId);
logger.debug("send to instance heart beat persistence worker, id: {}", instance.getId());
heartBeatGraph.start(instance);
logger.debug("push to instance heart beat persistence worker, id: {}", instance.getId());
getHeartBeatGraph().start(instance);
}
}
......@@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricServic
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,10 +35,13 @@ public class MemoryMetricService implements IMemoryMetricService {
private final Logger logger = LoggerFactory.getLogger(MemoryMetricService.class);
private final Graph<MemoryMetric> memoryMetricGraph;
private Graph<MemoryMetric> memoryMetricGraph;
public MemoryMetricService() {
this.memoryMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
private Graph<MemoryMetric> getMemoryMetricGraph() {
if (ObjectUtils.isEmpty(memoryMetricGraph)) {
this.memoryMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
}
return memoryMetricGraph;
}
@Override
......@@ -51,7 +55,7 @@ public class MemoryMetricService implements IMemoryMetricService {
memoryMetric.setCommitted(commited);
memoryMetric.setTimeBucket(timeBucket);
logger.debug("send to memory metric graph, id: {}", memoryMetric.getId());
memoryMetricGraph.start(memoryMetric);
logger.debug("push to memory metric graph, id: {}", memoryMetric.getId());
getMemoryMetricGraph().start(memoryMetric);
}
}
......@@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricSe
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,10 +35,13 @@ public class MemoryPoolMetricService implements IMemoryPoolMetricService {
private final Logger logger = LoggerFactory.getLogger(MemoryPoolMetricService.class);
private final Graph<MemoryPoolMetric> memoryPoolMetricGraph;
private Graph<MemoryPoolMetric> memoryPoolMetricGraph;
public MemoryPoolMetricService() {
this.memoryPoolMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
private Graph<MemoryPoolMetric> getMemoryPoolMetricGraph() {
if (ObjectUtils.isEmpty(memoryPoolMetricGraph)) {
this.memoryPoolMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
}
return memoryPoolMetricGraph;
}
@Override
......@@ -51,7 +55,7 @@ public class MemoryPoolMetricService implements IMemoryPoolMetricService {
memoryPoolMetric.setCommitted(commited);
memoryPoolMetric.setTimeBucket(timeBucket);
logger.debug("send to memory pool metric graph, id: {}", memoryPoolMetric.getId());
memoryPoolMetricGraph.start(memoryPoolMetric);
logger.debug("push to memory pool metric graph, id: {}", memoryPoolMetric.getId());
getMemoryPoolMetricGraph().start(memoryPoolMetric);
}
}
......@@ -25,6 +25,7 @@ import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,23 +38,36 @@ public class ApplicationIDService implements IApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);
private final ModuleManager moduleManager;
private final Graph<Application> applicationRegisterGraph;
private ApplicationCacheService applicationCacheService;
private Graph<Application> applicationRegisterGraph;
public ApplicationIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class);
}
private Graph<Application> getApplicationRegisterGraph() {
if (ObjectUtils.isEmpty(applicationRegisterGraph)) {
this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class);
}
return this.applicationRegisterGraph;
}
private ApplicationCacheService getApplicationCacheService() {
if (ObjectUtils.isEmpty(applicationCacheService)) {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
return applicationCacheService;
}
public int getOrCreate(String applicationCode) {
ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
int applicationId = service.get(applicationCode);
int applicationId = getApplicationCacheService().get(applicationCode);
if (applicationId == 0) {
Application application = new Application(applicationCode);
application.setApplicationCode(applicationCode);
application.setApplicationId(0);
applicationRegisterGraph.start(application);
getApplicationRegisterGraph().start(application);
}
return applicationId;
}
......
......@@ -25,6 +25,7 @@ import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.table.register.Instance;
......@@ -39,17 +40,38 @@ public class InstanceIDService implements IInstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
private final ModuleManager moduleManager;
private final Graph<Instance> instanceRegisterGraph;
private InstanceCacheService instanceCacheService;
private Graph<Instance> instanceRegisterGraph;
private IInstanceRegisterDAO instanceRegisterDAO;
public InstanceIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class);
}
private InstanceCacheService getInstanceCacheService() {
if (ObjectUtils.isEmpty(instanceCacheService)) {
instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
}
return instanceCacheService;
}
private Graph<Instance> getInstanceRegisterGraph() {
if (ObjectUtils.isEmpty(instanceRegisterGraph)) {
this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class);
}
return instanceRegisterGraph;
}
private IInstanceRegisterDAO getInstanceRegisterDAO() {
if (ObjectUtils.isEmpty(instanceRegisterDAO)) {
instanceRegisterDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceRegisterDAO.class);
}
return instanceRegisterDAO;
}
public int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo) {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
InstanceCacheService service = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
int instanceId = service.getInstanceId(applicationId, agentUUID);
int instanceId = getInstanceCacheService().getInstanceId(applicationId, agentUUID);
if (instanceId == 0) {
Instance instance = new Instance("0");
......@@ -60,15 +82,13 @@ public class InstanceIDService implements IInstanceIDService {
instance.setInstanceId(0);
instance.setOsInfo(osInfo);
instanceRegisterGraph.start(instance);
getInstanceRegisterGraph().start(instance);
}
return instanceId;
}
public void recover(int instanceId, int applicationId, long registerTime, String osInfo) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
final IInstanceRegisterDAO instanceRegisterDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceRegisterDAO.class);
Instance instance = new Instance(String.valueOf(instanceId));
instance.setApplicationId(applicationId);
instance.setAgentUUID("");
......@@ -77,6 +97,6 @@ public class InstanceIDService implements IInstanceIDService {
instance.setInstanceId(instanceId);
instance.setOsInfo(osInfo);
instanceRegisterDAO.save(instance);
getInstanceRegisterDAO().save(instance);
}
}
......@@ -39,7 +39,7 @@ public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker<Instance,
return InstanceRegisterRemoteWorker.class.hashCode();
}
public InstanceRegisterRemoteWorker(ModuleManager moduleManager) {
InstanceRegisterRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......
......@@ -25,6 +25,7 @@ import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,16 +38,29 @@ public class ServiceNameService implements IServiceNameService {
private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class);
private final ModuleManager moduleManager;
private final Graph<ServiceName> serviceNameRegisterGraph;
private ServiceIdCacheService serviceIdCacheService;
private Graph<ServiceName> serviceNameRegisterGraph;
public ServiceNameService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class);
}
private ServiceIdCacheService getServiceIdCacheService() {
if (ObjectUtils.isEmpty(serviceIdCacheService)) {
serviceIdCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class);
}
return serviceIdCacheService;
}
private Graph<ServiceName> getServiceNameRegisterGraph() {
if (ObjectUtils.isEmpty(serviceNameRegisterGraph)) {
this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class);
}
return serviceNameRegisterGraph;
}
public int getOrCreate(int applicationId, String serviceName) {
ServiceIdCacheService idCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class);
int serviceId = idCacheService.get(applicationId, serviceName);
int serviceId = getServiceIdCacheService().get(applicationId, serviceName);
if (serviceId == 0) {
ServiceName service = new ServiceName("0");
......@@ -54,7 +68,7 @@ public class ServiceNameService implements IServiceNameService {
service.setServiceName(serviceName);
service.setServiceId(0);
serviceNameRegisterGraph.start(service);
getServiceNameRegisterGraph().start(service);
}
return serviceId;
}
......
......@@ -31,7 +31,7 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerException;
*/
public class NodeComponentRemoteWorker extends AbstractRemoteWorker<NodeComponent, NodeComponent> {
public NodeComponentRemoteWorker(ModuleManager moduleManager) {
NodeComponentRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......
......@@ -31,7 +31,7 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerException;
*/
public class NodeMappingRemoteWorker extends AbstractRemoteWorker<NodeMapping, NodeMapping> {
public NodeMappingRemoteWorker(ModuleManager moduleManager) {
NodeMappingRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......
......@@ -67,7 +67,7 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
for (NodeMapping nodeMapping : nodeMappings) {
nodeMapping.setId(timeBucket + Const.ID_SPLIT + nodeMapping.getId());
nodeMapping.setTimeBucket(timeBucket);
logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId());
logger.debug("push to node mapping aggregation worker, id: {}", nodeMapping.getId());
graph.start(nodeMapping);
}
}
......
......@@ -31,7 +31,7 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerException;
*/
public class NodeReferenceRemoteWorker extends AbstractRemoteWorker<NodeReference, NodeReference> {
public NodeReferenceRemoteWorker(ModuleManager moduleManager) {
NodeReferenceRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......
......@@ -84,7 +84,7 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
serviceEntry.setRegisterTime(timeBucket);
serviceEntry.setNewestTime(timeBucket);
logger.debug("send to service entry aggregation worker, id: {}", serviceEntry.getId());
logger.debug("push to service entry aggregation worker, id: {}", serviceEntry.getId());
Graph<ServiceEntry> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_ENTRY_GRAPH_ID, ServiceEntry.class);
graph.start(serviceEntry);
}
......
......@@ -130,7 +130,7 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa
serviceReference.setId(idBuilder.toString());
serviceReference.setTimeBucket(timeBucket);
logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId());
logger.debug("push to service reference aggregation worker, id: {}", serviceReference.getId());
Graph<ServiceReference> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class);
graph.start(serviceReference);
......
......@@ -5,8 +5,12 @@
naming:
jetty:
host: localhost
port: 10800
port: 12800
context_path: /
remote:
gRPC:
host: localhost
port: 10800
agent_gRPC:
gRPC:
host: localhost
......@@ -18,6 +22,7 @@ agent_jetty:
context_path: /
agent_stream:
default:
buffer_file_path: ../buffer/
buffer_offset_max_file_size: 10M
buffer_segment_max_file_size: 500M
ui:
......@@ -25,10 +30,10 @@ ui:
host: localhost
port: 12800
context_path: /
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
\ No newline at end of file
#storage:
# elasticsearch:
# cluster_name: CollectorDBCluster
# cluster_transport_sniffer: true
# cluster_nodes: localhost:9300
# index_shards_number: 2
# index_replicas_number: 0
......@@ -17,7 +17,7 @@
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<Configuration status="debug">
<Configuration status="info">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
......@@ -29,7 +29,7 @@
<logger name="org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler" level="INFO"/>
<logger name="org.skywalking.apm.collector.stream.timer.PersistenceTimer" level="INFO"/>
<logger name="io.grpc.netty" level="INFO"/>
<Root level="debug">
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
......@@ -36,5 +36,10 @@
<artifactId>collector-cache-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -23,6 +23,7 @@ import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IApplicationCacheDAO;
......@@ -38,22 +39,30 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
private final Cache<String, Integer> codeCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final IApplicationCacheDAO applicationCacheDAO;
private final ModuleManager moduleManager;
private IApplicationCacheDAO applicationCacheDAO;
public ApplicationCacheGuavaService(ModuleManager moduleManager) {
this.applicationCacheDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationCacheDAO.class);
this.moduleManager = moduleManager;
}
private IApplicationCacheDAO getApplicationCacheDAO() {
if (ObjectUtils.isEmpty(applicationCacheDAO)) {
this.applicationCacheDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationCacheDAO.class);
}
return this.applicationCacheDAO;
}
public int get(String applicationCode) {
int applicationId = 0;
try {
applicationId = codeCache.get(applicationCode, () -> applicationCacheDAO.getApplicationId(applicationCode));
applicationId = codeCache.get(applicationCode, () -> getApplicationCacheDAO().getApplicationId(applicationCode));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (applicationId == 0) {
applicationId = applicationCacheDAO.getApplicationId(applicationCode);
applicationId = getApplicationCacheDAO().getApplicationId(applicationCode);
if (applicationId != 0) {
codeCache.put(applicationCode, applicationId);
}
......@@ -66,13 +75,13 @@ public class ApplicationCacheGuavaService implements ApplicationCacheService {
public String get(int applicationId) {
String applicationCode = Const.EMPTY_STRING;
try {
applicationCode = idCache.get(applicationId, () -> applicationCacheDAO.getApplicationCode(applicationId));
applicationCode = idCache.get(applicationId, () -> getApplicationCacheDAO().getApplicationCode(applicationId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (StringUtils.isEmpty(applicationCode)) {
applicationCode = applicationCacheDAO.getApplicationCode(applicationId);
applicationCode = getApplicationCacheDAO().getApplicationCode(applicationId);
if (StringUtils.isNotEmpty(applicationCode)) {
codeCache.put(applicationCode, applicationId);
}
......
......@@ -23,6 +23,7 @@ import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.slf4j.Logger;
......@@ -39,23 +40,31 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
private final Cache<String, Integer> stringCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final IInstanceCacheDAO instanceCacheDAO;
private final ModuleManager moduleManager;
private IInstanceCacheDAO instanceCacheDAO;
public InstanceCacheGuavaService(ModuleManager moduleManager) {
this.instanceCacheDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceCacheDAO.class);
this.moduleManager = moduleManager;
}
private IInstanceCacheDAO getInstanceCacheDAO() {
if (ObjectUtils.isEmpty(instanceCacheDAO)) {
this.instanceCacheDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceCacheDAO.class);
}
return this.instanceCacheDAO;
}
public int get(int applicationInstanceId) {
int applicationId = 0;
try {
applicationId = integerCache.get(applicationInstanceId, () -> instanceCacheDAO.getApplicationId(applicationInstanceId));
applicationId = integerCache.get(applicationInstanceId, () -> getInstanceCacheDAO().getApplicationId(applicationInstanceId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (applicationId == 0) {
applicationId = instanceCacheDAO.getApplicationId(applicationInstanceId);
applicationId = getInstanceCacheDAO().getApplicationId(applicationInstanceId);
if (applicationId != 0) {
integerCache.put(applicationInstanceId, applicationId);
}
......@@ -68,13 +77,13 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
int instanceId = 0;
try {
instanceId = stringCache.get(key, () -> instanceCacheDAO.getInstanceId(applicationId, agentUUID));
instanceId = stringCache.get(key, () -> getInstanceCacheDAO().getInstanceId(applicationId, agentUUID));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (instanceId == 0) {
instanceId = instanceCacheDAO.getInstanceId(applicationId, agentUUID);
instanceId = getInstanceCacheDAO().getInstanceId(applicationId, agentUUID);
if (applicationId != 0) {
stringCache.put(key, instanceId);
}
......
......@@ -23,6 +23,7 @@ import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.slf4j.Logger;
......@@ -37,22 +38,30 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
private final Cache<String, Integer> serviceIdCache = CacheBuilder.newBuilder().maximumSize(1000).build();
private final IServiceNameCacheDAO serviceNameCacheDAO;
private final ModuleManager moduleManager;
private IServiceNameCacheDAO serviceNameCacheDAO;
public ServiceIdCacheGuavaService(ModuleManager moduleManager) {
this.serviceNameCacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameCacheDAO.class);
this.moduleManager = moduleManager;
}
private IServiceNameCacheDAO getServiceNameCacheDAO() {
if (ObjectUtils.isEmpty(serviceNameCacheDAO)) {
this.serviceNameCacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameCacheDAO.class);
}
return this.serviceNameCacheDAO;
}
public int get(int applicationId, String serviceName) {
int serviceId = 0;
try {
serviceId = serviceIdCache.get(applicationId + Const.ID_SPLIT + serviceName, () -> serviceNameCacheDAO.getServiceId(applicationId, serviceName));
serviceId = serviceIdCache.get(applicationId + Const.ID_SPLIT + serviceName, () -> getServiceNameCacheDAO().getServiceId(applicationId, serviceName));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == 0) {
serviceId = serviceNameCacheDAO.getServiceId(applicationId, serviceName);
serviceId = getServiceNameCacheDAO().getServiceId(applicationId, serviceName);
if (serviceId != 0) {
serviceIdCache.put(applicationId + Const.ID_SPLIT + serviceName, serviceId);
}
......
......@@ -23,6 +23,7 @@ import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
......@@ -38,22 +39,30 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
private final Cache<Integer, String> serviceNameCache = CacheBuilder.newBuilder().maximumSize(10000).build();
private final IServiceNameCacheDAO serviceNameCacheDAO;
private final ModuleManager moduleManager;
private IServiceNameCacheDAO serviceNameCacheDAO;
public ServiceNameCacheGuavaService(ModuleManager moduleManager) {
this.serviceNameCacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameCacheDAO.class);
this.moduleManager = moduleManager;
}
private IServiceNameCacheDAO getServiceNameCacheDAO() {
if (ObjectUtils.isEmpty(serviceNameCacheDAO)) {
this.serviceNameCacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameCacheDAO.class);
}
return this.serviceNameCacheDAO;
}
public String get(int serviceId) {
String serviceName = Const.EMPTY_STRING;
try {
serviceName = serviceNameCache.get(serviceId, () -> serviceNameCacheDAO.getServiceName(serviceId));
serviceName = serviceNameCache.get(serviceId, () -> getServiceNameCacheDAO().getServiceName(serviceId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (StringUtils.isEmpty(serviceName)) {
serviceName = serviceNameCacheDAO.getServiceName(serviceId);
serviceName = getServiceNameCacheDAO().getServiceName(serviceId);
if (StringUtils.isNotEmpty(serviceName)) {
serviceNameCache.put(serviceId, serviceName);
}
......
......@@ -32,6 +32,34 @@
<artifactId>snakeyaml</artifactId>
<groupId>org.yaml</groupId>
</exclusion>
<exclusion>
<artifactId>netty-common</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-transport</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-codec</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-codec-http</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-buffer</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-handler</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-resolver</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......@@ -50,9 +78,15 @@
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<version>1.4.0</version>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
......@@ -21,6 +21,12 @@
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
......
......@@ -16,11 +16,28 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.config;
package org.skywalking.apm.collector.core.cache;
/**
* @author peng-yongsheng
*/
public class SystemConfig {
public static String DATA_PATH = "../data";
public interface Collection<Data> {
void reading();
boolean isReading();
void writing();
boolean isWriting();
void clear();
int size();
void finishReading();
void finishWriting();
Data collection();
}
......@@ -16,28 +16,30 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl.data;
package org.skywalking.apm.collector.core.cache;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
public abstract class Window {
public abstract class Window<WindowCollection extends Collection> {
private AtomicInteger windowSwitch = new AtomicInteger(0);
private DataCollection pointer;
private WindowCollection pointer;
private DataCollection windowDataA;
private DataCollection windowDataB;
private WindowCollection windowDataA;
private WindowCollection windowDataB;
public Window() {
windowDataA = new DataCollection();
windowDataB = new DataCollection();
pointer = windowDataA;
protected Window() {
this.windowDataA = collectionInstance();
this.windowDataB = collectionInstance();
this.pointer = windowDataA;
}
public abstract WindowCollection collectionInstance();
public boolean trySwitchPointer() {
return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
}
......@@ -55,7 +57,7 @@ public abstract class Window {
getLast().reading();
}
protected DataCollection getCurrentAndWriting() {
protected WindowCollection getCurrentAndWriting() {
if (pointer == windowDataA) {
windowDataA.writing();
return windowDataA;
......@@ -65,11 +67,11 @@ public abstract class Window {
}
}
protected DataCollection getCurrent() {
protected WindowCollection getCurrent() {
return pointer;
}
public DataCollection getLast() {
public WindowCollection getLast() {
if (pointer == windowDataA) {
return windowDataB;
} else {
......
......@@ -183,6 +183,7 @@ public abstract class Data extends EndOfBatchQueueMessage {
for (Boolean dataBoolean : dataBooleans) {
dataStr.append(dataBoolean).append(",");
}
dataStr.append("]");
return dataStr.toString();
}
}
......@@ -15,6 +15,20 @@ naming:
host: localhost
port: 10800
context_path: /
agent_gRPC:
gRPC:
host: localhost
port: 11800
agent_jetty:
jetty:
host: localhost
port: 12800
context_path: /
agent_stream:
default:
buffer_file_path: ../buffer/
buffer_offset_max_file_size: 10M
buffer_segment_max_file_size: 500M
ui:
jetty:
host: localhost
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author peng-yongsheng
*/
public abstract class RemoteException extends CollectorException {
public RemoteException(String message) {
super(message);
}
public RemoteException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -19,8 +19,8 @@
package org.skywalking.apm.collector.remote;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.RemoteServerService;
/**
* @author peng-yongsheng
......@@ -34,6 +34,6 @@ public class RemoteModule extends Module {
}
@Override public Class[] services() {
return new Class[] {RemoteServerService.class, RemoteSenderService.class};
return new Class[] {RemoteSenderService.class, RemoteDataRegisterService.class};
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.service;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.data.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class CommonRemoteDataRegisterService implements RemoteDataRegisterService, RemoteDataIDGetter, RemoteDataInstanceCreatorGetter {
private final Logger logger = LoggerFactory.getLogger(CommonRemoteDataRegisterService.class);
private Integer id;
private final Map<Class<? extends Data>, Integer> dataClassMapping;
private final Map<Integer, RemoteDataInstanceCreator> dataInstanceCreatorMapping;
public CommonRemoteDataRegisterService() {
this.id = 1;
this.dataClassMapping = new HashMap<>();
this.dataInstanceCreatorMapping = new HashMap<>();
}
@Override public void register(Class<? extends Data> dataClass, RemoteDataInstanceCreator instanceCreator) {
if (!dataClassMapping.containsKey(dataClass)) {
dataClassMapping.put(dataClass, this.id);
dataInstanceCreatorMapping.put(this.id, instanceCreator);
this.id++;
} else {
logger.warn("The data class {} was registered.", dataClass.getName());
}
}
@Override
public Integer getRemoteDataId(Class<? extends Data> dataClass) throws RemoteDataMappingIdNotFoundException {
if (dataClassMapping.containsKey(dataClass)) {
return dataClassMapping.get(dataClass);
} else {
throw new RemoteDataMappingIdNotFoundException("Could not found the id of remote data class " + dataClass.getName());
}
}
@Override public RemoteDataInstanceCreator getInstanceCreator(
Integer remoteDataId) throws RemoteDataInstanceCreatorNotFoundException {
if (dataInstanceCreatorMapping.containsKey(remoteDataId)) {
return dataInstanceCreatorMapping.get(remoteDataId);
} else {
throw new RemoteDataInstanceCreatorNotFoundException("Could not found the instance creator of remote data id " + remoteDataId);
}
}
}
......@@ -26,7 +26,7 @@ import org.skywalking.apm.collector.core.data.Data;
public interface RemoteClient extends Comparable<RemoteClient> {
String getAddress();
void send(int graphId, int nodeId, Data data);
void push(int graphId, int nodeId, Data data);
boolean equals(String address);
}
......@@ -24,5 +24,5 @@ import org.skywalking.apm.collector.core.module.Service;
* @author peng-yongsheng
*/
public interface RemoteClientService extends Service {
RemoteClient create(String host, int port);
RemoteClient create(String host, int port, int channelSize, int bufferSize);
}
......@@ -23,8 +23,6 @@ import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public interface DataReceiver<Output extends Data> {
Output output(int graphId, int nodeId);
void receive(Output data);
public interface RemoteDataIDGetter {
Integer getRemoteDataId(Class<? extends Data> dataClass) throws RemoteDataMappingIdNotFoundException;
}
......@@ -18,11 +18,10 @@
package org.skywalking.apm.collector.remote.service;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface RemoteServerService extends Service {
void registerReceiver(DataReceiver receiver);
public interface RemoteDataInstanceCreatorGetter {
RemoteDataRegisterService.RemoteDataInstanceCreator getInstanceCreator(
Integer remoteDataId) throws RemoteDataInstanceCreatorNotFoundException;
}
......@@ -18,18 +18,13 @@
package org.skywalking.apm.collector.remote.service;
import org.skywalking.apm.collector.remote.RemoteException;
/**
* @author peng-yongsheng
*/
public class DataReceiverRegisterListener {
private DataReceiver dataReceiver;
public DataReceiver getDataReceiver() {
return dataReceiver;
}
public void setDataReceiver(DataReceiver dataReceiver) {
this.dataReceiver = dataReceiver;
public class RemoteDataInstanceCreatorNotFoundException extends RemoteException {
public RemoteDataInstanceCreatorNotFoundException(String message) {
super(message);
}
}
......@@ -16,13 +16,15 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.buffer;
package org.skywalking.apm.collector.remote.service;
import org.skywalking.apm.collector.core.config.SystemConfig;
import org.skywalking.apm.collector.remote.RemoteException;
/**
* @author peng-yongsheng
*/
public class SegmentBufferConfig {
public static String BUFFER_PATH = SystemConfig.DATA_PATH + "/buffer/";
public class RemoteDataMappingIdNotFoundException extends RemoteException {
public RemoteDataMappingIdNotFoundException(String message) {
super(message);
}
}
......@@ -16,21 +16,18 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.config;
package org.skywalking.apm.collector.remote.service;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public enum SystemConfigParser {
INSTANCE;
public interface RemoteDataRegisterService extends Service {
void register(Class<? extends Data> dataClass, RemoteDataInstanceCreator instanceCreator);
private static final String DATA_PATH = "data.path";
public void parse() {
if (!StringUtils.isEmpty(System.getProperty(DATA_PATH))) {
SystemConfig.DATA_PATH = System.getProperty(DATA_PATH);
}
interface RemoteDataInstanceCreator<RemoteData extends Data> {
RemoteData createInstance(String id);
}
}
......@@ -41,6 +41,11 @@
<artifactId>collector-grpc-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-datacarrier</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
......
......@@ -12,10 +12,9 @@ import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler;
import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService;
import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteServerService;
import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener;
import org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService;
import org.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.RemoteServerService;
import org.skywalking.apm.collector.server.Server;
/**
......@@ -24,10 +23,14 @@ import org.skywalking.apm.collector.server.Server;
public class RemoteModuleGRPCProvider extends ModuleProvider {
public static final String NAME = "gRPC";
private static final String HOST = "host";
private static final String PORT = "port";
private final DataReceiverRegisterListener listener = new DataReceiverRegisterListener();
private static final String CHANNEL_SIZE = "channel_size";
private static final String BUFFER_SIZE = "buffer_size";
private GRPCRemoteSenderService remoteSenderService;
private CommonRemoteDataRegisterService remoteDataRegisterService;
@Override public String name() {
return NAME;
......@@ -40,10 +43,13 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
Integer channelSize = (Integer)config.getOrDefault(CHANNEL_SIZE, 5);
Integer bufferSize = (Integer)config.getOrDefault(BUFFER_SIZE, 1000);
remoteSenderService = new GRPCRemoteSenderService(host, port);
this.registerServiceImplementation(RemoteServerService.class, new GRPCRemoteServerService(listener));
remoteDataRegisterService = new CommonRemoteDataRegisterService();
remoteSenderService = new GRPCRemoteSenderService(host, port, channelSize, bufferSize, remoteDataRegisterService);
this.registerServiceImplementation(RemoteSenderService.class, remoteSenderService);
this.registerServiceImplementation(RemoteDataRegisterService.class, remoteDataRegisterService);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
......@@ -52,7 +58,7 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);
gRPCServer.addHandler(new RemoteCommonServiceHandler(listener));
gRPCServer.addHandler(new RemoteCommonServiceHandler(remoteDataRegisterService));
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(RemoteModule.NAME, this.name(), new RemoteModuleGRPCRegistration(host, port));
......
......@@ -20,12 +20,16 @@ package org.skywalking.apm.collector.remote.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService;
import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener;
import org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter;
import org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorNotFoundException;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,24 +41,31 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class);
private final DataReceiverRegisterListener listener;
private final RemoteDataInstanceCreatorGetter instanceCreatorGetter;
private final GRPCRemoteDeserializeService service;
public RemoteCommonServiceHandler(DataReceiverRegisterListener listener) {
this.listener = listener;
public RemoteCommonServiceHandler(RemoteDataInstanceCreatorGetter instanceCreatorGetter) {
this.instanceCreatorGetter = instanceCreatorGetter;
this.service = new GRPCRemoteDeserializeService();
}
@SuppressWarnings("unchecked")
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
return new StreamObserver<RemoteMessage>() {
@Override public void onNext(RemoteMessage message) {
int graphId = message.getGraphId();
int nodeId = message.getNodeId();
int remoteDataId = message.getRemoteDataId();
RemoteData remoteData = message.getRemoteData();
Data output = listener.getDataReceiver().output(graphId, nodeId);
service.deserialize(remoteData, output);
listener.getDataReceiver().receive(output);
try {
Data output = instanceCreatorGetter.getInstanceCreator(remoteDataId).createInstance(Const.EMPTY_STRING);
service.deserialize(remoteData, output);
Next next = GraphManager.INSTANCE.findGraph(graphId).toFinder().findNext(nodeId);
next.execute(output);
} catch (RemoteDataInstanceCreatorNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void onError(Throwable throwable) {
......@@ -62,6 +73,7 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
}
@Override public void onCompleted() {
responseObserver.onNext(Empty.newBuilder().build());
responseObserver.onCompleted();
}
};
......
......@@ -19,36 +19,146 @@
package org.skywalking.apm.collector.remote.grpc.service;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.skywalking.apm.collector.remote.service.RemoteDataIDGetter;
import org.skywalking.apm.collector.remote.service.RemoteDataMappingIdNotFoundException;
import org.skywalking.apm.commons.datacarrier.DataCarrier;
import org.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClient implements RemoteClient {
private final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
private final GRPCRemoteSerializeService service;
private final StreamObserver<RemoteMessage> streamObserver;
private final GRPCClient client;
private final DataCarrier<RemoteMessage> carrier;
private final String address;
private final RemoteDataIDGetter remoteDataIDGetter;
public GRPCRemoteClient(String host, int port, StreamObserver<RemoteMessage> streamObserver) {
this.address = host + ":" + String.valueOf(port);
this.streamObserver = streamObserver;
GRPCRemoteClient(GRPCClient client, RemoteDataIDGetter remoteDataIDGetter, int channelSize, int bufferSize) {
this.address = client.toString();
this.client = client;
this.service = new GRPCRemoteSerializeService();
this.remoteDataIDGetter = remoteDataIDGetter;
this.carrier = new DataCarrier<>(channelSize, bufferSize);
this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
this.carrier.consume(new RemoteMessageConsumer(), 1);
}
@Override public final String getAddress() {
return this.address;
}
@Override public void send(int graphId, int nodeId, Data data) {
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setGraphId(graphId);
builder.setNodeId(nodeId);
builder.setRemoteData(service.serialize(data));
@Override public void push(int graphId, int nodeId, Data data) {
try {
Integer remoteDataId = remoteDataIDGetter.getRemoteDataId(data.getClass());
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setGraphId(graphId);
builder.setNodeId(nodeId);
builder.setRemoteDataId(remoteDataId);
builder.setRemoteData(service.serialize(data));
this.carrier.produce(builder.build());
logger.debug("put remote message into queue, id: {}", data.getId());
} catch (RemoteDataMappingIdNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
@Override public void init() {
}
@Override public void consume(List<RemoteMessage> remoteMessages) {
StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
for (RemoteMessage remoteMessage : remoteMessages) {
streamObserver.onNext(remoteMessage);
}
streamObserver.onCompleted();
}
@Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
logger.error(t.getMessage(), t);
}
@Override public void onExit() {
}
}
private StreamObserver<RemoteMessage> createStreamObserver() {
RemoteCommonServiceGrpc.RemoteCommonServiceStub stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
StreamStatus status = new StreamStatus(false);
return stub.call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
status.finished();
}
});
}
class StreamStatus {
private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
private volatile boolean status;
StreamStatus(boolean status) {
this.status = status;
}
public boolean isFinish() {
return status;
}
void finished() {
this.status = true;
}
/**
* @param maxTimeout max wait time, milliseconds.
*/
public void wait4Finish(long maxTimeout) {
long time = 0;
while (!status) {
if (time > maxTimeout) {
break;
}
try2Sleep(5);
time += 5;
}
}
streamObserver.onNext(builder.build());
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
@Override public boolean equals(String address) {
......
......@@ -18,14 +18,11 @@
package org.skywalking.apm.collector.remote.grpc.service;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.client.ClientException;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteDataIDGetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -36,74 +33,19 @@ public class GRPCRemoteClientService implements RemoteClientService {
private final Logger logger = LoggerFactory.getLogger(GRPCRemoteClientService.class);
@Override public RemoteClient create(String host, int port) {
private final RemoteDataIDGetter remoteDataIDGetter;
GRPCRemoteClientService(RemoteDataIDGetter remoteDataIDGetter) {
this.remoteDataIDGetter = remoteDataIDGetter;
}
@Override public RemoteClient create(String host, int port, int channelSize, int bufferSize) {
GRPCClient client = new GRPCClient(host, port);
try {
client.initialize();
} catch (ClientException e) {
e.printStackTrace();
}
RemoteCommonServiceGrpc.RemoteCommonServiceStub stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
StreamObserver<RemoteMessage> streamObserver = createStreamObserver(stub);
return new GRPCRemoteClient(host, port, streamObserver);
}
private StreamObserver<RemoteMessage> createStreamObserver(RemoteCommonServiceGrpc.RemoteCommonServiceStub stub) {
StreamStatus status = new StreamStatus(false);
return stub.call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
status.finished();
}
});
}
class StreamStatus {
private volatile boolean status;
public StreamStatus(boolean status) {
this.status = status;
}
public boolean isFinish() {
return status;
}
public void finished() {
this.status = true;
}
/**
* @param maxTimeout max wait time, milliseconds.
*/
public void wait4Finish(long maxTimeout) {
long time = 0;
while (!status) {
if (time > maxTimeout) {
break;
}
try2Sleep(5);
time += 5;
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
logger.error(e.getMessage(), e);
}
return new GRPCRemoteClient(client, remoteDataIDGetter, channelSize, bufferSize);
}
}
......@@ -43,8 +43,5 @@ public class GRPCRemoteDeserializeService implements RemoteDeserializeService<Re
for (int i = 0; i < remoteData.getDataDoublesCount(); i++) {
data.setDataDouble(i, remoteData.getDataDoubles(i));
}
for (int i = 0; i < remoteData.getDataBytesCount(); i++) {
data.setDataBytes(i, remoteData.getDataBytes(i).toByteArray());
}
}
}
......@@ -30,6 +30,7 @@ import org.skywalking.apm.collector.remote.grpc.service.selector.ForeverFirstSel
import org.skywalking.apm.collector.remote.grpc.service.selector.HashCodeSelector;
import org.skywalking.apm.collector.remote.grpc.service.selector.RollingSelector;
import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.skywalking.apm.collector.remote.service.RemoteDataIDGetter;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
......@@ -38,13 +39,15 @@ import org.skywalking.apm.collector.remote.service.Selector;
*/
public class GRPCRemoteSenderService extends ClusterModuleListener implements RemoteSenderService {
public static final String PATH = "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME;
private static final String PATH = "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME;
private final GRPCRemoteClientService service;
private List<RemoteClient> remoteClients;
private final String selfAddress;
private final HashCodeSelector hashCodeSelector;
private final ForeverFirstSelector foreverFirstSelector;
private final RollingSelector rollingSelector;
private final int channelSize;
private final int bufferSize;
@Override public Mode send(int graphId, int nodeId, Data data, Selector selector) {
RemoteClient remoteClient;
......@@ -66,18 +69,21 @@ public class GRPCRemoteSenderService extends ClusterModuleListener implements Re
if (remoteClient.equals(selfAddress)) {
return Mode.Local;
} else {
remoteClient.send(graphId, nodeId, data);
remoteClient.push(graphId, nodeId, data);
return Mode.Remote;
}
}
public GRPCRemoteSenderService(String host, int port) {
this.service = new GRPCRemoteClientService();
public GRPCRemoteSenderService(String host, int port, int channelSize, int bufferSize,
RemoteDataIDGetter remoteDataIDGetter) {
this.service = new GRPCRemoteClientService(remoteDataIDGetter);
this.remoteClients = new ArrayList<>();
this.selfAddress = host + ":" + String.valueOf(port);
this.hashCodeSelector = new HashCodeSelector();
this.foreverFirstSelector = new ForeverFirstSelector();
this.rollingSelector = new RollingSelector();
this.channelSize = channelSize;
this.bufferSize = bufferSize;
}
@Override public String path() {
......@@ -90,7 +96,7 @@ public class GRPCRemoteSenderService extends ClusterModuleListener implements Re
String host = serverAddress.split(":")[0];
int port = Integer.parseInt(serverAddress.split(":")[1]);
RemoteClient remoteClient = service.create(host, port);
RemoteClient remoteClient = service.create(host, port, channelSize, bufferSize);
newRemoteClients.add(remoteClient);
Collections.sort(newRemoteClients);
......
......@@ -18,7 +18,6 @@
package org.skywalking.apm.collector.remote.grpc.service;
import com.google.protobuf.ByteString;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.service.RemoteSerializeService;
......@@ -31,22 +30,22 @@ public class GRPCRemoteSerializeService implements RemoteSerializeService<Remote
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
for (int i = 0; i < data.getDataStringsCount(); i++) {
builder.setDataStrings(i, data.getDataString(i));
builder.addDataStrings(data.getDataString(i));
}
for (int i = 0; i < data.getDataIntegersCount(); i++) {
builder.setDataIntegers(i, data.getDataInteger(i));
builder.addDataIntegers(data.getDataInteger(i));
}
for (int i = 0; i < data.getDataLongsCount(); i++) {
builder.setDataLongs(i, data.getDataLong(i));
builder.addDataLongs(data.getDataLong(i));
}
for (int i = 0; i < data.getDataBooleansCount(); i++) {
builder.setDataBooleans(i, data.getDataBoolean(i));
builder.addDataBooleans(data.getDataBoolean(i));
}
for (int i = 0; i < data.getDataDoublesCount(); i++) {
builder.setDataDoubles(i, data.getDataDouble(i));
builder.addDataDoubles(data.getDataDouble(i));
}
for (int i = 0; i < data.getDataBytesCount(); i++) {
builder.setDataBytes(i, ByteString.copyFrom(data.getDataBytes(i)));
// builder.addDataBytes(ByteString.copyFrom(data.getDataBytes(i)));
}
return builder;
}
......
package org.skywalking.apm.collector.remote.grpc.service;
import org.skywalking.apm.collector.remote.service.DataReceiver;
import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener;
import org.skywalking.apm.collector.remote.service.RemoteServerService;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteServerService implements RemoteServerService {
private DataReceiverRegisterListener listener;
public GRPCRemoteServerService(DataReceiverRegisterListener listener) {
this.listener = listener;
}
@Override public void registerReceiver(DataReceiver receiver) {
listener.setDataReceiver(receiver);
}
}
......@@ -11,7 +11,8 @@ service RemoteCommonService {
message RemoteMessage {
int32 graphId = 1;
int32 nodeId = 2;
RemoteData remoteData = 3;
int32 remoteDataId = 3;
RemoteData remoteData = 4;
}
message RemoteData {
......@@ -19,8 +20,8 @@ message RemoteData {
repeated int64 dataLongs = 2;
repeated double dataDoubles = 3;
repeated int32 dataIntegers = 4;
repeated bytes dataBytes = 5;
repeated bool dataBooleans = 6;
// repeated bytes dataBytes = 5;
repeated bool dataBooleans = 5;
}
message Empty {
......
......@@ -38,17 +38,4 @@ public abstract class AbstractRemoteWorker<INPUT extends Data, OUTPUT extends Da
}
public abstract Selector selector();
/**
* This method use for message producer to call for send message.
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)} )}
*/
public final void allocateJob(INPUT message) {
try {
onWork(message);
} catch (WorkerException e) {
}
}
}
......@@ -45,9 +45,8 @@ public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT ex
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
* @return The created worker reference. See {@link RemoteWorkerRef}
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
* @return The created worker reference. See {@link RemoteWorkerRef} worker instance, when the worker provider not
* find then Throw this Exception.
*/
@Override final public RemoteWorkerRef create(WorkerCreateListener workerCreateListener) {
WORKER_TYPE remoteWorker = workerInstance(getModuleManager());
......
......@@ -34,7 +34,7 @@ public class RemoteWorkerRef<INPUT extends Data, OUTPUT extends Data> extends Wo
private final RemoteSenderService remoteSenderService;
private final int graphId;
public RemoteWorkerRef(AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker, RemoteSenderService remoteSenderService,
RemoteWorkerRef(AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker, RemoteSenderService remoteSenderService,
int graphId) {
super(remoteWorker);
this.remoteWorker = remoteWorker;
......
......@@ -63,7 +63,7 @@ public abstract class AggregationWorker<INPUT extends Data, OUTPUT extends Data>
throw new WorkerException(e.getMessage(), e);
}
}
dataCache.getLast().asMap().forEach((String id, Data data) -> {
dataCache.getLast().collection().forEach((String id, Data data) -> {
logger.debug(data.toString());
onNext((OUTPUT)data);
});
......@@ -73,7 +73,7 @@ public abstract class AggregationWorker<INPUT extends Data, OUTPUT extends Data>
private void aggregate(INPUT message) {
dataCache.writing();
if (dataCache.containsKey(message.getId())) {
message.mergeData(dataCache.get(message.getId()));
dataCache.get(message.getId()).mergeData(message);
} else {
dataCache.put(message.getId(), message);
}
......
......@@ -86,8 +86,8 @@ public abstract class PersistenceWorker<INPUT extends Data, OUTPUT extends Data>
}
}
if (dataCache.getLast().asMap() != null) {
batchCollection = prepareBatch(dataCache.getLast().asMap());
if (dataCache.getLast().collection() != null) {
batchCollection = prepareBatch(dataCache.getLast().collection());
}
} finally {
dataCache.finishReadingLast();
......@@ -104,7 +104,7 @@ public abstract class PersistenceWorker<INPUT extends Data, OUTPUT extends Data>
if (ObjectUtils.isNotEmpty(dbData)) {
dbData.mergeData(data);
try {
updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(data));
updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(dbData));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
......
......@@ -18,15 +18,20 @@
package org.skywalking.apm.collector.stream.worker.impl.data;
import org.skywalking.apm.collector.core.cache.Window;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public class DataCache extends Window {
public class DataCache extends Window<DataCollection> {
private DataCollection lockedDataCollection;
@Override public DataCollection collectionInstance() {
return new DataCollection();
}
public boolean containsKey(String id) {
return lockedDataCollection.containsKey(id);
}
......
......@@ -20,12 +20,13 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.collector.core.cache.Collection;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public class DataCollection {
public class DataCollection implements Collection<Map<String, Data>> {
private Map<String, Data> data;
private volatile boolean writing;
private volatile boolean reading;
......@@ -40,23 +41,23 @@ public class DataCollection {
writing = false;
}
public void writing() {
@Override public void writing() {
writing = true;
}
public boolean isWriting() {
@Override public boolean isWriting() {
return writing;
}
public void finishReading() {
@Override public void finishReading() {
reading = false;
}
public void reading() {
@Override public void reading() {
reading = true;
}
public boolean isReading() {
@Override public boolean isReading() {
return reading;
}
......@@ -72,15 +73,15 @@ public class DataCollection {
return data.get(key);
}
public int size() {
@Override public int size() {
return data.size();
}
public void clear() {
@Override public void clear() {
data.clear();
}
public Map<String, Data> asMap() {
public Map<String, Data> collection() {
return data;
}
}
......@@ -70,11 +70,6 @@
<artifactId>log4j-core</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
......
......@@ -91,12 +91,14 @@ public class ConsumerThread<T> extends Thread {
consumeList.add(element);
}
hasData = true;
}
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
if (consumeList.size() > 0) {
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
}
}
return hasData;
}
......
......@@ -31,8 +31,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.4.0</grpc.version>
<netty.version>4.1.12.Final</netty.version>
<grpc.version>1.7.0</grpc.version>
<compiler.version>1.6</compiler.version>
</properties>
......@@ -41,16 +40,6 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
......@@ -62,16 +51,6 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
<build>
......@@ -104,7 +83,7 @@
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.7.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
......
......@@ -35,7 +35,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jetty.version>9.4.2.v20170220</jetty.version>
<grpc.version>1.4.0</grpc.version>
<grpc.version>1.7.0</grpc.version>
<bytebuddy.version>1.7.6</bytebuddy.version>
<shade.package>org.skywalking.apm.dependencies</shade.package>
......@@ -105,7 +105,7 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>1.4.0</version>
<version>${grpc.version}</version>
<exclusions>
<exclusion>
<artifactId>mockito-core</artifactId>
......
......@@ -164,9 +164,12 @@
<if>
<equals arg1="${project.packaging}" arg2="jar"/>
<then>
<mkdir dir="${project.build.directory}${sdk.plugin.related.dir}/../../../../packages/skywalking-agent/plugins"/>
<copy file="${project.build.directory}/${project.artifactId}-${project.version}.jar"
tofile="${project.build.directory}${sdk.plugin.related.dir}/../../../../packages/skywalking-agent/plugins/${project.artifactId}-${project.version}.jar" overwrite="true"/>
<mkdir
dir="${project.build.directory}${sdk.plugin.related.dir}/../../../../packages/skywalking-agent/plugins"/>
<copy
file="${project.build.directory}/${project.artifactId}-${project.version}.jar"
tofile="${project.build.directory}${sdk.plugin.related.dir}/../../../../packages/skywalking-agent/plugins/${project.artifactId}-${project.version}.jar"
overwrite="true"/>
</then>
</if>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册