From efe5299d03536e739c3d8022bf0deb56cbc46088 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com>
Date: Mon, 30 Jul 2018 09:54:36 +0800
Subject: [PATCH] Feature/oap/remote (#1505)
* Sample operator code.
* Indicator aggregator framework.
* Provide some annotation for OAL.
* Remote module.
* Register service.
* Add apache license header.
* Ignore comments when load definition files.
---
.../standalone/StandaloneManagerTest.java | 7 +-
...lusterModuleZookeeperProviderTestCase.java | 17 +-
oap-server/server-core/pom.xml | 50 ++++++
.../oap/server/core/CoreModule.java | 12 ++
.../oap/server/core/CoreModuleProvider.java | 51 +++---
.../core/analysis/DispatcherManager.java | 5 +-
.../server/core/analysis/data/StreamData.java | 4 +-
.../analysis/endpoint/EndpointDispatcher.java | 19 ++-
.../EndpointLatencyAvgAggregateWorker.java | 42 +++++
.../endpoint/EndpointLatencyAvgIndicator.java | 27 ++-
...> EndpointLatencyAvgPersistentWorker.java} | 12 +-
.../EndpointLatencyAvgRemoteWorker.java | 41 +++++
.../core/analysis/indicator/AvgIndicator.java | 16 +-
.../core/analysis/indicator/Indicator.java | 8 +-
.../indicator/annotation/IndicatorType.java | 2 +
.../define/IndicatorDefineLoadException.java | 29 ++++
.../indicator/define/IndicatorMapper.java | 82 +++++++++
.../AbstractAggregatorWorker.java} | 19 ++-
.../worker/AbstractPersistentWorker.java | 35 ++++
.../analysis/worker/AbstractRemoteWorker.java | 63 +++++++
.../server/core/analysis/worker/Worker.java | 29 ++++
.../define/WorkerDefineLoadException.java | 29 ++++
.../analysis/worker/define/WorkerMapper.java | 102 +++++++++++
.../server/core/cluster/RemoteInstance.java | 41 ++---
.../core/receiver/SourceReceiverImpl.java | 5 +-
.../server/core/remote/Deserializable.java | 28 ++++
.../core/remote/RemoteSenderService.java | 60 +++++++
.../core/remote/RemoteServiceHandler.java | 71 ++++++++
.../Serializable.java} | 8 +-
.../core/remote/client/GRPCRemoteClient.java | 158 ++++++++++++++++++
.../core/remote/client/RemoteClient.java | 33 ++++
.../remote/client/RemoteClientManager.java | 126 ++++++++++++++
.../core/remote/client/SelfRemoteClient.java | 53 ++++++
.../remote/selector/ForeverFirstSelector.java | 39 +++++
.../remote/selector/HashCodeSelector.java | 35 ++++
.../remote/selector/RemoteClientSelector.java | 30 ++++
.../core/remote/selector/RollingSelector.java | 42 +++++
.../core/remote/{ => selector}/Selector.java | 2 +-
.../src/main/proto/RemoteService.proto | 43 +++++
.../resources/META-INF/defines/indicator.def | 19 +++
.../resources/META-INF/defines/worker.def | 21 +++
.../define/IndicatorMapperTestCase.java | 36 ++++
.../indicator/define/TestAvgIndicator.java | 38 +++++
.../resources/META-INF/defines/indicator.def | 19 +++
.../resources/META-INF/defines/worker.def | 17 ++
.../server-core/src/test/resources/log4j2.xml | 31 ++++
.../library/client/grpc/GRPCClient.java | 5 +-
.../server/library/module/ModuleProvider.java | 3 +-
.../mesh/provider/MeshGRPCHandler.java | 2 -
49 files changed, 1542 insertions(+), 124 deletions(-)
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/{EndpointLatencyAvgAggregator.java => EndpointLatencyAvgPersistentWorker.java} (70%)
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/{AbstractAggregator.java => worker/AbstractAggregatorWorker.java} (84%)
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/{analysis/data/RemoteData.java => remote/Serializable.java} (80%)
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/{ => selector}/Selector.java (93%)
create mode 100644 oap-server/server-core/src/main/proto/RemoteService.proto
create mode 100644 oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
create mode 100644 oap-server/server-core/src/main/resources/META-INF/defines/worker.def
create mode 100644 oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
create mode 100644 oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
create mode 100644 oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
create mode 100644 oap-server/server-core/src/test/resources/META-INF/defines/worker.def
create mode 100644 oap-server/server-core/src/test/resources/log4j2.xml
diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
index 2e69130300..55671bce0d 100644
--- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
+++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
@@ -19,15 +19,14 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.*;
public class StandaloneManagerTest {
@Test
public void test() {
StandaloneManager standaloneManager = new StandaloneManager();
- RemoteInstance remote1 = new RemoteInstance();
- RemoteInstance remote2 = new RemoteInstance();
+ RemoteInstance remote1 = new RemoteInstance("A", 100, true);
+ RemoteInstance remote2 = new RemoteInstance("B", 100, false);
standaloneManager.registerRemote(remote1);
Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
index bbffa9ffcb..a318481ed5 100644
--- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
+++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
@@ -21,16 +21,9 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.io.IOException;
import java.util.List;
import org.apache.curator.test.TestingServer;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.junit.*;
/**
* @author peng-yongsheng
@@ -59,9 +52,7 @@ public class ClusterModuleZookeeperProviderTestCase {
ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
- RemoteInstance remoteInstance = new RemoteInstance();
- remoteInstance.setHost("ProviderAHost");
- remoteInstance.setPort(1000);
+ RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true);
moduleRegister.registerRemote(remoteInstance);
diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index 767e1c8bd7..ca6ac7abe1 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -41,6 +41,11 @@
library-util${project.version}
+
+ org.apache.skywalking
+ library-client
+ ${project.version}
+ org.apache.skywalkinglibrary-server
@@ -57,4 +62,49 @@
${project.version}
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.4.1.Final
+
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 2.4.3
+
+ ${project.build.sourceEncoding}
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.5.0
+
+
+ com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
+
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
+
+
+
+
+
+ compile
+ compile-custom
+
+
+
+
+
+
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 6f3de48753..b03152fd6a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -19,7 +19,11 @@
package org.apache.skywalking.oap.server.core;
import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -38,6 +42,7 @@ public class CoreModule extends ModuleDefine {
List classes = new ArrayList<>();
addServerInterface(classes);
addReceiverInterface(classes);
+ addInsideService(classes);
return classes.toArray(new Class[] {});
}
@@ -47,6 +52,13 @@ public class CoreModule extends ModuleDefine {
classes.add(JettyHandlerRegister.class);
}
+ private void addInsideService(List classes) {
+ classes.add(IndicatorMapper.class);
+ classes.add(WorkerMapper.class);
+ classes.add(RemoteClientManager.class);
+ classes.add(RemoteSenderService.class);
+ }
+
private void addReceiverInterface(List classes) {
classes.add(SourceReceiver.class);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 1af3873de7..757c34a765 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -18,24 +18,18 @@
package org.apache.skywalking.oap.server.core;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
-import org.apache.skywalking.oap.server.core.receiver.SourceReceiverImpl;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.receiver.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -47,17 +41,21 @@ public class CoreModuleProvider extends ModuleProvider {
private final CoreModuleConfig moduleConfig;
private GRPCServer grpcServer;
private JettyServer jettyServer;
+ private final IndicatorMapper indicatorMapper;
+ private final WorkerMapper workerMapper;
public CoreModuleProvider() {
super();
this.moduleConfig = new CoreModuleConfig();
+ this.indicatorMapper = new IndicatorMapper();
+ this.workerMapper = new WorkerMapper(getManager());
}
@Override public String name() {
return "default";
}
- @Override public Class module() {
+ @Override public Class extends ModuleDefine> module() {
return CoreModule.class;
}
@@ -75,11 +73,24 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
- this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
+ this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl(getManager()));
+
+ this.registerServiceImplementation(IndicatorMapper.class, indicatorMapper);
+ this.registerServiceImplementation(WorkerMapper.class, workerMapper);
+
+ this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager()));
+ this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
}
- @Override public void start() {
+ @Override public void start() throws ModuleStartException {
+ grpcServer.addHandler(new RemoteServiceHandler(getManager()));
+ try {
+ indicatorMapper.load();
+ workerMapper.load();
+ } catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
}
@Override public void notifyAfterCompleted() throws ModuleStartException {
@@ -90,9 +101,7 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
- RemoteInstance gRPCServerInstance = new RemoteInstance();
- gRPCServerInstance.setHost(moduleConfig.getGRPCHost());
- gRPCServerInstance.setPort(moduleConfig.getGRPCPort());
+ RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index de8da4e4e3..9317382735 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointDispatcher;
import org.apache.skywalking.oap.server.core.receiver.Scope;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
@@ -32,9 +33,9 @@ public class DispatcherManager {
private Map dispatcherMap;
- public DispatcherManager() {
+ public DispatcherManager(ModuleManager moduleManager) {
this.dispatcherMap = new HashMap<>();
- this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
+ this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher(moduleManager));
}
public SourceDispatcher getDispatcher(Scope scope) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
index 63f149f7b9..57c75d4497 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
@@ -18,10 +18,12 @@
package org.apache.skywalking.oap.server.core.analysis.data;
+import org.apache.skywalking.oap.server.core.remote.*;
+
/**
* @author peng-yongsheng
*/
-public abstract class StreamData implements QueueData {
+public abstract class StreamData implements QueueData, Serializable, Deserializable {
private EndOfBatchContext endOfBatchContext;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
index 152ac5eb93..bcb3a2c665 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
@@ -18,18 +18,22 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.core.receiver.Endpoint;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class EndpointDispatcher implements SourceDispatcher {
- private final EndpointLatencyAvgAggregator avgAggregator;
+ private final ModuleManager moduleManager;
+ private EndpointLatencyAvgAggregateWorker avgAggregator;
- public EndpointDispatcher() {
- this.avgAggregator = new EndpointLatencyAvgAggregator();
+ public EndpointDispatcher(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
}
@Override public void dispatch(Endpoint source) {
@@ -37,7 +41,14 @@ public class EndpointDispatcher implements SourceDispatcher {
}
private void avg(Endpoint source) {
- EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(source.getTimeBucket(), source.getId());
+ if (avgAggregator == null) {
+ WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ avgAggregator = (EndpointLatencyAvgAggregateWorker)workerMapper.findInstanceByClass(EndpointLatencyAvgAggregateWorker.class);
+ }
+
+ EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator();
+ indicator.setId(source.getId());
+ indicator.setTimeBucket(source.getTimeBucket());
indicator.combine(source.getLatency(), 1);
avgAggregator.in(indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
new file mode 100644
index 0000000000..62d51341e4
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.endpoint;
+
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker {
+
+ private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
+
+ private final EndpointLatencyAvgRemoteWorker remoter;
+
+ public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
+ super(moduleManager);
+ this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
+ }
+
+ @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+ remoter.in(data);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index a200aeba7e..e339afa733 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -18,19 +18,16 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
+import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgIndicator extends AvgIndicator {
- private final int id;
-
- public EndpointLatencyAvgIndicator(long timeBucket, int id) {
- super(timeBucket);
- this.id = id;
- }
+ @Setter @Getter private int id;
@Override public int hashCode() {
int result = 17;
@@ -55,4 +52,22 @@ public class EndpointLatencyAvgIndicator extends AvgIndicator {
return true;
}
+
+ @Override public RemoteData.Builder serialize() {
+ RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+ remoteBuilder.setDataIntegers(0, getId());
+ remoteBuilder.setDataIntegers(1, getCount());
+
+ remoteBuilder.setDataLongs(0, getTimeBucket());
+ remoteBuilder.setDataLongs(1, getSummation());
+ return remoteBuilder;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ setId(remoteData.getDataIntegers(0));
+ setCount(remoteData.getDataIntegers(1));
+
+ setTimeBucket(remoteData.getDataLongs(0));
+ setSummation(remoteData.getDataLongs(1));
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
similarity index 70%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
index f40990976c..e3c5c23069 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
@@ -18,17 +18,15 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractPersistentWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
-public class EndpointLatencyAvgAggregator extends AbstractAggregator {
-
- private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
-
- @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+public class EndpointLatencyAvgPersistentWorker extends AbstractPersistentWorker {
+ public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
+ super(moduleManager);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
new file mode 100644
index 0000000000..47f75215ab
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.endpoint;
+
+import org.apache.skywalking.oap.server.core.analysis.worker.AbstractRemoteWorker;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public class EndpointLatencyAvgRemoteWorker extends AbstractRemoteWorker {
+
+ public EndpointLatencyAvgRemoteWorker(ModuleManager moduleManager) {
+ super(moduleManager);
+ }
+
+ @Override public Selector selector() {
+ return Selector.HashCode;
+ }
+
+ @Override public Class nextWorkerClass() {
+ return EndpointLatencyAvgPersistentWorker.class;
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index 54ca9d68c2..da065f37f1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -18,28 +18,26 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
+import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
/**
* @author peng-yongsheng
*/
-@IndicatorType
+@IndicatorType(selector = Selector.HashCode)
public abstract class AvgIndicator extends Indicator {
- private long summation;
- private int count;
-
- public AvgIndicator(long timeBucket) {
- super(timeBucket);
- }
+ @Getter @Setter private long summation;
+ @Getter @Setter private int count;
@Entrance
- public void combine(@SourceFrom long summation, @ConstOne int count) {
+ public final void combine(@SourceFrom long summation, @ConstOne int count) {
this.summation += summation;
this.count += count;
}
- @Override public void combine(Indicator indicator) {
+ @Override public final void combine(Indicator indicator) {
AvgIndicator avgIndicator = (AvgIndicator)indicator;
combine(avgIndicator.summation, avgIndicator.count);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 66bb57d586..533379e7d8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
-import lombok.Getter;
+import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
/**
@@ -26,11 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
*/
public abstract class Indicator extends StreamData {
- @Getter private final long timeBucket;
-
- public Indicator(long timeBucket) {
- this.timeBucket = timeBucket;
- }
+ @Getter @Setter private long timeBucket;
public abstract void combine(Indicator indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index 7e8b8e1a68..46f03450ff 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
/**
* @author peng-yongsheng
@@ -26,4 +27,5 @@ import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.SOURCE)
public @interface IndicatorType {
+ Selector selector();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
new file mode 100644
index 0000000000..ca0521a97a
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IndicatorDefineLoadException extends Exception {
+
+ public IndicatorDefineLoadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
new file mode 100644
index 0000000000..8513fa2cc9
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+import java.io.*;
+import java.net.URL;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IndicatorMapper implements Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(IndicatorMapper.class);
+
+ private int id = 0;
+ private final Map, Integer> classKeyMapping;
+ private final Map> idKeyMapping;
+
+ public IndicatorMapper() {
+ this.classKeyMapping = new HashMap<>();
+ this.idKeyMapping = new HashMap<>();
+ }
+
+ @SuppressWarnings(value = "unchecked")
+ public void load() throws IndicatorDefineLoadException {
+ try {
+ List indicatorClasses = new LinkedList<>();
+
+ Enumeration urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/indicator.def");
+ while (urlEnumeration.hasMoreElements()) {
+ URL definitionFileURL = urlEnumeration.nextElement();
+ logger.info("Load indicator definition file url: {}", definitionFileURL.getPath());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
+ Properties properties = new Properties();
+ properties.load(bufferedReader);
+
+ Enumeration defineItem = properties.propertyNames();
+ while (defineItem.hasMoreElements()) {
+ String fullNameClass = (String)defineItem.nextElement();
+ indicatorClasses.add(fullNameClass);
+ }
+ }
+
+ for (String indicatorClassName : indicatorClasses) {
+ Class indicatorClass = (Class)Class.forName(indicatorClassName);
+ id++;
+ classKeyMapping.put(indicatorClass, id);
+ idKeyMapping.put(id, indicatorClass);
+ }
+ } catch (IOException | ClassNotFoundException e) {
+ throw new IndicatorDefineLoadException(e.getMessage(), e);
+ }
+ }
+
+ public int findIdByClass(Class indicatorClass) {
+ return classKeyMapping.get(indicatorClass);
+ }
+
+ public Class findClassById(int id) {
+ return idKeyMapping.get(id);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
similarity index 84%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
index f4e769ade6..65d68b4408 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
@@ -16,35 +16,36 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis;
+package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractAggregator {
+public abstract class AbstractAggregatorWorker extends Worker {
- private static final Logger logger = LoggerFactory.getLogger(AbstractAggregator.class);
+ private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class);
private final DataCarrier dataCarrier;
private final MergeDataCache mergeDataCache;
private int messageNum;
- public AbstractAggregator() {
+ public AbstractAggregatorWorker(ModuleManager moduleManager) {
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
}
- public void in(INPUT message) {
- message.setEndOfBatchContext(new EndOfBatchContext(false));
- dataCarrier.produce(message);
+ @Override public final void in(INPUT input) {
+ input.setEndOfBatchContext(new EndOfBatchContext(false));
+ dataCarrier.produce(input);
}
private void onWork(INPUT message) {
@@ -91,9 +92,9 @@ public abstract class AbstractAggregator {
private class AggregatorConsumer implements IConsumer {
- private final AbstractAggregator aggregator;
+ private final AbstractAggregatorWorker aggregator;
- private AggregatorConsumer(AbstractAggregator aggregator) {
+ private AggregatorConsumer(AbstractAggregatorWorker aggregator) {
this.aggregator = aggregator;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
new file mode 100644
index 0000000000..bcead67924
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class AbstractPersistentWorker extends Worker {
+
+ public AbstractPersistentWorker(ModuleManager moduleManager) {
+ }
+
+ @Override public final void in(INPUT input) {
+
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
new file mode 100644
index 0000000000..e3fb89d0c5
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class AbstractRemoteWorker extends Worker {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractRemoteWorker.class);
+
+ private final ModuleManager moduleManager;
+ private RemoteSenderService remoteSender;
+ private WorkerMapper workerMapper;
+
+ public AbstractRemoteWorker(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ }
+
+ @Override public final void in(INPUT input) {
+ if (remoteSender == null) {
+ remoteSender = moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
+ }
+ if (workerMapper == null) {
+ workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ }
+
+ try {
+ int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
+ remoteSender.send(nextWorkerId, input, selector());
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ public abstract Class nextWorkerClass();
+
+ public abstract Selector selector();
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
new file mode 100644
index 0000000000..53010eedfa
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class Worker {
+
+ public abstract void in(INPUT input);
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
new file mode 100644
index 0000000000..a24a0a7a7b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker.define;
+
+/**
+ * @author peng-yongsheng
+ */
+public class WorkerDefineLoadException extends Exception {
+
+ public WorkerDefineLoadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
new file mode 100644
index 0000000000..9732430569
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker.define;
+
+import java.io.*;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.Worker;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class WorkerMapper implements Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkerMapper.class);
+
+ private int id = 0;
+ private final ModuleManager moduleManager;
+ private final Map, Integer> classKeyMapping;
+ private final Map> idKeyMapping;
+ private final Map, Worker> classKeyInstanceMapping;
+ private final Map idKeyInstanceMapping;
+
+ public WorkerMapper(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ this.classKeyMapping = new HashMap<>();
+ this.idKeyMapping = new HashMap<>();
+ this.classKeyInstanceMapping = new HashMap<>();
+ this.idKeyInstanceMapping = new HashMap<>();
+ }
+
+ @SuppressWarnings(value = "unchecked")
+ public void load() throws WorkerDefineLoadException {
+ try {
+ List workerClasses = new LinkedList<>();
+
+ Enumeration urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/worker.def");
+ while (urlEnumeration.hasMoreElements()) {
+ URL definitionFileURL = urlEnumeration.nextElement();
+ logger.info("Load worker definition file url: {}", definitionFileURL.getPath());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
+ Properties properties = new Properties();
+ properties.load(bufferedReader);
+
+ Enumeration defineItem = properties.propertyNames();
+ while (defineItem.hasMoreElements()) {
+ String fullNameClass = (String)defineItem.nextElement();
+ workerClasses.add(fullNameClass);
+ }
+ }
+
+ for (String workerClassName : workerClasses) {
+ Class workerClass = (Class)Class.forName(workerClassName);
+ id++;
+ classKeyMapping.put(workerClass, id);
+ idKeyMapping.put(id, workerClass);
+
+ Constructor constructor = workerClass.getDeclaredConstructor(ModuleManager.class);
+ Worker worker = constructor.newInstance(moduleManager);
+ classKeyInstanceMapping.put(workerClass, worker);
+ idKeyInstanceMapping.put(id, worker);
+ }
+ } catch (Exception e) {
+ throw new WorkerDefineLoadException(e.getMessage(), e);
+ }
+ }
+
+ public int findIdByClass(Class workerClass) {
+ return classKeyMapping.get(workerClass);
+ }
+
+ public Class findClassById(int id) {
+ return idKeyMapping.get(id);
+ }
+
+ public Worker findInstanceByClass(Class workerClass) {
+ return classKeyInstanceMapping.get(workerClass);
+ }
+
+ public Worker findInstanceById(int id) {
+ return idKeyInstanceMapping.get(id);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 53c4ea0cc1..1e85b3252b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -19,48 +19,29 @@
package org.apache.skywalking.oap.server.core.cluster;
import java.util.Objects;
+import lombok.*;
/**
* @author peng-yongsheng
*/
-public class RemoteInstance {
+public class RemoteInstance implements Comparable {
- private String host;
- private int port;
- private boolean self = false;
+ @Getter private final String host;
+ @Getter private final int port;
+ @Getter @Setter private boolean isSelf = false;
- public RemoteInstance() {
-
- }
-
- public RemoteInstance(String host, int port, boolean self) {
+ public RemoteInstance(String host, int port, boolean isSelf) {
this.host = host;
this.port = port;
- this.self = self;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
+ this.isSelf = isSelf;
}
- public boolean isSelf() {
- return self;
+ @Override public int compareTo(RemoteInstance o) {
+ return toString().compareTo(toString());
}
- public void setSelf(boolean self) {
- this.self = self;
+ @Override public String toString() {
+ return host + String.valueOf(port);
}
@Override public boolean equals(Object o) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
index 4d04a31dd3..a6cf211ec6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.receiver;
import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
@@ -27,8 +28,8 @@ public class SourceReceiverImpl implements SourceReceiver {
private final DispatcherManager dispatcherManager;
- public SourceReceiverImpl() {
- this.dispatcherManager = new DispatcherManager();
+ public SourceReceiverImpl(ModuleManager moduleManager) {
+ this.dispatcherManager = new DispatcherManager(moduleManager);
}
@Override public void receive(Source source) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
new file mode 100644
index 0000000000..b2b3a7ecbd
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface Deserializable {
+ void deserialize(RemoteData remoteData);
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
new file mode 100644
index 0000000000..b2ce1c45ec
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.*;
+import org.apache.skywalking.oap.server.core.remote.selector.*;
+import org.apache.skywalking.oap.server.library.module.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteSenderService implements Service {
+
+ private final ModuleManager moduleManager;
+ private final HashCodeSelector hashCodeSelector;
+ private final ForeverFirstSelector foreverFirstSelector;
+ private final RollingSelector rollingSelector;
+
+ public RemoteSenderService(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ this.hashCodeSelector = new HashCodeSelector();
+ this.foreverFirstSelector = new ForeverFirstSelector();
+ this.rollingSelector = new RollingSelector();
+ }
+
+ public void send(int nextWorkId, Indicator indicator, Selector selector) {
+ RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).getService(RemoteClientManager.class);
+
+ RemoteClient remoteClient;
+ switch (selector) {
+ case HashCode:
+ remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), indicator);
+ remoteClient.push(nextWorkId, indicator);
+ case Rolling:
+ remoteClient = rollingSelector.select(clientManager.getRemoteClient(), indicator);
+ remoteClient.push(nextWorkId, indicator);
+ case ForeverFirst:
+ remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), indicator);
+ remoteClient.push(nextWorkId, indicator);
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
new file mode 100644
index 0000000000..892e9516dd
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
+
+ private final IndicatorMapper indicatorMapper;
+ private final WorkerMapper workerMapper;
+
+ public RemoteServiceHandler(ModuleManager moduleManager) {
+ this.indicatorMapper = moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class);
+ this.workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ }
+
+ @Override public StreamObserver call(StreamObserver responseObserver) {
+ return new StreamObserver() {
+ @Override public void onNext(RemoteMessage message) {
+ int indicatorId = message.getIndicatorId();
+ int nextWorkerId = message.getNextWorkerId();
+ RemoteData remoteData = message.getRemoteData();
+
+ Class indicatorClass = indicatorMapper.findClassById(indicatorId);
+ try {
+ indicatorClass.newInstance().deserialize(remoteData);
+ } catch (InstantiationException | IllegalAccessException e) {
+ logger.warn(e.getMessage());
+ }
+ }
+
+ @Override public void onError(Throwable throwable) {
+ logger.error(throwable.getMessage(), throwable);
+ }
+
+ @Override public void onCompleted() {
+ responseObserver.onNext(Empty.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
similarity index 80%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
index fe4f1f5b5b..1a7dbede3a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
@@ -16,11 +16,13 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
-public interface RemoteData {
- String selectKey();
+public interface Serializable {
+ RemoteData.Builder serialize();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
new file mode 100644
index 0000000000..75a5952adc
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClient implements RemoteClient, Comparable {
+
+ private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
+
+ private final GRPCClient client;
+ private final DataCarrier carrier;
+ private final IndicatorMapper indicatorMapper;
+
+ public GRPCRemoteClient(IndicatorMapper indicatorMapper, RemoteInstance remoteInstance, int channelSize,
+ int bufferSize) {
+ this.indicatorMapper = indicatorMapper;
+ this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
+ this.carrier = new DataCarrier<>(channelSize, bufferSize);
+ this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
+ this.carrier.consume(new RemoteMessageConsumer(), 1);
+ }
+
+ @Override public void push(int nextWorkerId, Indicator indicator) {
+ int indicatorId = indicatorMapper.findIdByClass(indicator.getClass());
+ RemoteMessage.Builder builder = RemoteMessage.newBuilder();
+ builder.setNextWorkerId(nextWorkerId);
+ builder.setIndicatorId(indicatorId);
+ builder.setRemoteData(indicator.serialize());
+
+ this.carrier.produce(builder.build());
+ }
+
+ class RemoteMessageConsumer implements IConsumer {
+ @Override public void init() {
+ }
+
+ @Override public void consume(List remoteMessages) {
+ StreamObserver streamObserver = createStreamObserver();
+ for (RemoteMessage remoteMessage : remoteMessages) {
+ streamObserver.onNext(remoteMessage);
+ }
+ streamObserver.onCompleted();
+ }
+
+ @Override public void onError(List remoteMessages, Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+
+ @Override public void onExit() {
+ }
+ }
+
+ private StreamObserver createStreamObserver() {
+ RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
+
+ StreamStatus status = new StreamStatus(false);
+ return stub.call(new StreamObserver() {
+ @Override public void onNext(Empty empty) {
+ }
+
+ @Override public void onError(Throwable throwable) {
+ logger.error(throwable.getMessage(), throwable);
+ }
+
+ @Override public void onCompleted() {
+ status.finished();
+ }
+ });
+ }
+
+ class StreamStatus {
+
+ private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
+
+ private volatile boolean status;
+
+ StreamStatus(boolean status) {
+ this.status = status;
+ }
+
+ public boolean isFinish() {
+ return status;
+ }
+
+ void finished() {
+ this.status = true;
+ }
+
+ /**
+ * @param maxTimeout max wait time, milliseconds.
+ */
+ public void wait4Finish(long maxTimeout) {
+ long time = 0;
+ while (!status) {
+ if (time > maxTimeout) {
+ break;
+ }
+ try2Sleep(5);
+ time += 5;
+ }
+ }
+
+ /**
+ * Try to sleep, and ignore the {@link InterruptedException}
+ *
+ * @param millis the length of time to sleep in milliseconds
+ */
+ private void try2Sleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override public int compareTo(GRPCRemoteClient o) {
+ return this.client.toString().compareTo(o.client.toString());
+ }
+
+ public String getHost() {
+ return client.getHost();
+ }
+
+ public int getPort() {
+ return client.getPort();
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
new file mode 100644
index 0000000000..98d23f1017
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface RemoteClient {
+
+ String getHost();
+
+ int getPort();
+
+ void push(int nextWorkerId, Indicator indicator);
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
new file mode 100644
index 0000000000..4d2049babc
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.*;
+import java.util.concurrent.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteClientManager implements Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
+
+ private final ModuleManager moduleManager;
+ private IndicatorMapper indicatorMapper;
+ private ClusterNodesQuery clusterNodesQuery;
+ private final List clientsA;
+ private final List clientsB;
+ private List usingClients;
+
+ public RemoteClientManager(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ this.clientsA = new LinkedList<>();
+ this.clientsB = new LinkedList<>();
+ this.usingClients = clientsA;
+ }
+
+ public void start() {
+ this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class);
+ this.indicatorMapper = moduleManager.find(ClusterModule.NAME).getService(IndicatorMapper.class);
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 2, TimeUnit.SECONDS);
+ }
+
+ private void refresh() {
+ List instanceList = clusterNodesQuery.queryRemoteNodes();
+ Collections.sort(instanceList);
+
+ if (!compare(instanceList)) {
+ buildNewClients(instanceList);
+ }
+ }
+
+ public List getRemoteClient() {
+ return usingClients;
+ }
+
+ private List getFreeClients() {
+ if (usingClients.equals(clientsA)) {
+ return clientsB;
+ } else {
+ return clientsA;
+ }
+ }
+
+ private void switchCurrentClients() {
+ if (usingClients.equals(clientsA)) {
+ usingClients = clientsB;
+ } else {
+ usingClients = clientsA;
+ }
+ }
+
+ private void buildNewClients(List remoteInstances) {
+ getFreeClients().clear();
+
+ Map currentClientsMap = new HashMap<>();
+ this.usingClients.forEach(remoteClient -> {
+ currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient);
+ });
+
+ remoteInstances.forEach(remoteInstance -> {
+ String address = address(remoteInstance.getHost(), remoteInstance.getPort());
+ RemoteClient client;
+ if (currentClientsMap.containsKey(address)) {
+ client = currentClientsMap.get(address);
+ } else {
+ if (remoteInstance.isSelf()) {
+ client = new SelfRemoteClient(moduleManager, remoteInstance.getHost(), remoteInstance.getPort());
+ } else {
+ client = new GRPCRemoteClient(indicatorMapper, remoteInstance, 1, 3000);
+ }
+ }
+ getFreeClients().add(client);
+ });
+
+ switchCurrentClients();
+ }
+
+ private boolean compare(List remoteInstances) {
+ if (usingClients.size() == remoteInstances.size()) {
+ for (int i = 0; i < usingClients.size(); i++) {
+ if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private String address(String host, int port) {
+ return host + String.valueOf(port);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
new file mode 100644
index 0000000000..109ff77981
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public class SelfRemoteClient implements RemoteClient {
+
+ private final ModuleManager moduleManager;
+ private final String host;
+ private final int port;
+
+ public SelfRemoteClient(ModuleManager moduleManager, String host, int port) {
+ this.moduleManager = moduleManager;
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override public String getHost() {
+ return host;
+ }
+
+ @Override public int getPort() {
+ return port;
+ }
+
+ @Override public void push(int nextWorkerId, Indicator indicator) {
+ WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ workerMapper.findInstanceById(nextWorkerId).in(indicator);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
new file mode 100644
index 0000000000..e28f2031fd
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.selector;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ForeverFirstSelector implements RemoteClientSelector {
+
+ private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
+
+ @Override public RemoteClient select(List clients, Indicator indicator) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("clients size: {}", clients.size());
+ }
+ return clients.get(0);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
new file mode 100644
index 0000000000..3d256b5ea9
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.selector;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+
+/**
+ * @author peng-yongsheng
+ */
+public class HashCodeSelector implements RemoteClientSelector {
+
+ @Override public RemoteClient select(List clients, Indicator indicator) {
+ int size = clients.size();
+ int selectIndex = Math.abs(indicator.hashCode()) % size;
+ return clients.get(selectIndex);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
new file mode 100644
index 0000000000..438cbad0b2
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.selector;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface RemoteClientSelector {
+ RemoteClient select(List clients, Indicator indicator);
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
new file mode 100644
index 0000000000..c74e8c3641
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.selector;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RollingSelector implements RemoteClientSelector {
+
+ private int index = 0;
+
+ @Override public RemoteClient select(List clients, Indicator indicator) {
+ int size = clients.size();
+ index++;
+ int selectIndex = Math.abs(index) % size;
+
+ if (index == Integer.MAX_VALUE) {
+ index = 0;
+ }
+ return clients.get(selectIndex);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
similarity index 93%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
index b7ac6ed005..bd51648961 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.core.remote;
+package org.apache.skywalking.oap.server.core.remote.selector;
/**
* @author peng-yongsheng
diff --git a/oap-server/server-core/src/main/proto/RemoteService.proto b/oap-server/server-core/src/main/proto/RemoteService.proto
new file mode 100644
index 0000000000..410b0c31e4
--- /dev/null
+++ b/oap-server/server-core/src/main/proto/RemoteService.proto
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.skywalking.oap.server.core.remote.grpc.proto";
+
+service RemoteService {
+ rpc call (stream RemoteMessage) returns (Empty) {
+ }
+}
+
+message RemoteMessage {
+ int32 nextWorkerId = 1;
+ int32 indicatorId = 2;
+ RemoteData remoteData = 3;
+}
+
+message RemoteData {
+ repeated string dataStrings = 1;
+ repeated int64 dataLongs = 2;
+ repeated double dataDoubles = 3;
+ repeated int32 dataIntegers = 4;
+}
+
+message Empty {
+}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
new file mode 100644
index 0000000000..ce6b6ddc30
--- /dev/null
+++ b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgIndicator
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/resources/META-INF/defines/worker.def b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def
new file mode 100644
index 0000000000..5afc8db522
--- /dev/null
+++ b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgAggregateWorker
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgRemoteWorker
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgPersistentWorker
\ No newline at end of file
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
new file mode 100644
index 0000000000..4b58eb567f
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+import org.junit.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IndicatorMapperTestCase {
+
+ @Test
+ public void test() throws IndicatorDefineLoadException {
+ IndicatorMapper mapper = new IndicatorMapper();
+ mapper.load();
+
+ Assert.assertEquals(1, mapper.findIdByClass(TestAvgIndicator.class));
+ Assert.assertEquals(TestAvgIndicator.class, mapper.findClassById(1));
+ }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
new file mode 100644
index 0000000000..17d1189eed
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TestAvgIndicator extends AvgIndicator {
+
+ @Setter @Getter private int id;
+
+ @Override public RemoteData.Builder serialize() {
+ return null;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ }
+}
diff --git a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
new file mode 100644
index 0000000000..97491fffba
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.indicator.define.TestAvgIndicator
\ No newline at end of file
diff --git a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
new file mode 100644
index 0000000000..33ebbb1f39
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
\ No newline at end of file
diff --git a/oap-server/server-core/src/test/resources/log4j2.xml b/oap-server/server-core/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000..6eb5b3fb98
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
index 2416be6398..188fae35c8 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.library.client.grpc;
import io.grpc.*;
+import lombok.Getter;
import org.apache.skywalking.oap.server.library.client.Client;
/**
@@ -26,9 +27,9 @@ import org.apache.skywalking.oap.server.library.client.Client;
*/
public class GRPCClient implements Client {
- private final String host;
+ @Getter private final String host;
- private final int port;
+ @Getter private final int port;
private ManagedChannel channel;
diff --git a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
index 7596fb60b9..9873303c0e 100644
--- a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
+++ b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
@@ -18,8 +18,7 @@
package org.apache.skywalking.oap.server.library.module;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
/**
* The ModuleProvider is an implementation of a {@link ModuleDefine}.
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
index e9764ce564..8893821060 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
@@ -35,8 +35,6 @@ public class MeshGRPCHandler extends ServiceMeshMetricServiceGrpc.ServiceMeshMet
if (logger.isDebugEnabled()) {
logger.debug("Received mesh metric: {}", metric);
}
-
-
}
@Override public void onError(Throwable throwable) {
--
GitLab