提交 ced19c22 编写于 作者: P pengys5 提交者: zhangxin

use provider to initialize config

上级 ae36838b
package com.a.eye.skywalking.collector;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.Const;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public enum AkkaSystem {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(AkkaSystem.class);
public ActorSystem create() {
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.load("application.conf"));
if (!StringUtil.isEmpty(ClusterConfig.Cluster.seed_nodes)) {
config.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + generateSeedNodes()));
}
return ActorSystem.create(Const.SystemName, config);
}
private String generateSeedNodes() {
String[] seedNodes = ClusterConfig.Cluster.seed_nodes.split(",");
String akkaSeedNodes = "";
for (int i = 0; i < seedNodes.length; i++) {
String akkaNodeName = "\"akka.tcp://" + Const.SystemName + "@" + seedNodes[i] + "\"";
if (i > 0) {
akkaSeedNodes += ",";
}
akkaSeedNodes += akkaNodeName;
}
akkaSeedNodes = "[" + akkaSeedNodes + "]";
logger.info("config seedNodes: %s, generate seedNodes: %s", ClusterConfig.Cluster.seed_nodes, akkaSeedNodes);
return akkaSeedNodes;
}
}
......@@ -2,16 +2,13 @@ package com.a.eye.skywalking.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.a.eye.skywalking.collector.config.ConfigInitializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ServiceLoader;
/**
......@@ -26,26 +23,16 @@ public class CollectorSystem {
return clusterContext;
}
public void boot() throws UsedRoleNameException, ProviderNotFoundException {
public void boot() throws UsedRoleNameException, ProviderNotFoundException, IOException, IllegalAccessException {
ConfigInitializer.INSTANCE.initialize();
createAkkaSystem();
createListener();
loadLocalProviders();
createClusterWorkers();
}
public void terminate() {
clusterContext.getAkkaSystem().terminate();
}
private void createAkkaSystem() {
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.load("application.conf"));
if (!StringUtil.isEmpty(ClusterConfig.Cluster.seed_nodes)) {
config.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes));
}
ActorSystem akkaSystem = ActorSystem.create(Const.SystemName, config);
ActorSystem akkaSystem = AkkaSystem.INSTANCE.create();
clusterContext = new ClusterWorkerContext(akkaSystem);
}
......
......@@ -14,8 +14,8 @@ public class ClusterConfig {
public static class Cluster {
public static class Current {
public static String hostname = "127.0.0.1";
public static String port = "2551";
public static String hostname = "";
public static String port = "";
public static String roles = "";
}
......
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.api.util.ConfigInitializer;
import com.a.eye.skywalking.api.util.StringUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.util.Properties;
import com.a.eye.skywalking.collector.config.ConfigProvider;
/**
* <code>ClusterConfigInitializer</code> Contains static methods for setting
* {@link ClusterConfig} attributes value.
*
* <p>
* The priority of value setting is
* system property -> collector.config -> {@link ClusterConfig} default value
* <p>
*
* @author pengys5
*/
public class ClusterConfigInitializer {
private static Logger logger = LogManager.getFormatterLogger(ClusterConfigInitializer.class);
public static final String ConfigFileName = "collector.config";
public class ClusterConfigProvider implements ConfigProvider {
/**
* Read config file to setting {@link ClusterConfig} then get system property to overwrite it.
*
* @param configFileName is the config file name, the file format is key-value pairs
*/
public static void initialize(String configFileName) {
InputStream configFileStream = ClusterConfigInitializer.class.getResourceAsStream("/" + configFileName);
if (configFileStream == null) {
logger.info("Not provide sky-walking certification documents, sky-walking api run in default config.");
} else {
try {
Properties properties = new Properties();
properties.load(configFileStream);
ConfigInitializer.initialize(properties, ClusterConfig.class);
} catch (Exception e) {
logger.error("Failed to read the config file, sky-walking api run in default config.", e);
}
}
@Override
public Class configClass() {
return ClusterConfig.class;
}
@Override
public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("cluster.current.hostname"))) {
ClusterConfig.Cluster.Current.hostname = System.getProperty("cluster.current.hostname");
}
......
package com.a.eye.skywalking.collector.config;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ConfigInitializer {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(ConfigInitializer.class);
public void initialize() throws IOException, IllegalAccessException {
InputStream configFileStream = ConfigInitializer.class.getResourceAsStream("/collector.config");
initializeConfigFile(configFileStream);
ServiceLoader<ConfigProvider> configProviders = ServiceLoader.load(ConfigProvider.class);
for (ConfigProvider provider : configProviders) {
provider.cliArgs();
}
}
private void initializeConfigFile(InputStream configFileStream) throws IOException, IllegalAccessException {
ServiceLoader<ConfigProvider> configProviders = ServiceLoader.load(ConfigProvider.class);
Properties properties = new Properties();
properties.load(configFileStream);
for (ConfigProvider provider : configProviders) {
logger.info("configProvider provider name: %s", provider.getClass().getName());
Class configClass = provider.configClass();
com.a.eye.skywalking.api.util.ConfigInitializer.initialize(properties, configClass);
}
}
}
package com.a.eye.skywalking.collector.config;
/**
* @author pengys5
*/
public interface ConfigProvider {
Class configClass();
void cliArgs();
}
package com.a.eye.skywalking.collector.config;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class ConfigInitializerTestCase {
@Before
public void clear() {
System.clearProperty("cluster.current.hostname");
System.clearProperty("cluster.current.port");
System.clearProperty("cluster.current.roles");
System.clearProperty("cluster.seed_nodes");
}
@Test
public void testInitialize() throws Exception {
ConfigInitializer.INSTANCE.initialize();
Assert.assertEquals("127.0.0.1", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1000", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("Test, Test1", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("127.0.0.1:1000", ClusterConfig.Cluster.seed_nodes);
}
@Test
public void testInitializeWithCli() throws Exception {
System.setProperty("cluster.current.hostname", "127.0.0.2");
System.setProperty("cluster.current.port", "1001");
System.setProperty("cluster.current.roles", "Test1, Test2");
System.setProperty("cluster.seed_nodes", "127.0.0.1:1000, 127.0.0.1:1001");
ConfigInitializer.INSTANCE.initialize();
Assert.assertEquals("127.0.0.2", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1001", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("Test1, Test2", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("127.0.0.1:1000, 127.0.0.1:1001", ClusterConfig.Cluster.seed_nodes);
}
}
cluster.current.hostname = 127.0.0.1
cluster.current.port = 1000
cluster.current.roles = [Test, Test1]
cluster.current.hostname=127.0.0.1
cluster.current.port=1000
cluster.current.roles=WorkersListener
cluster.seed_nodes=127.0.0.1:1000
cluster.nodes=["akka.tcp://CollectorSystem@127.0.0.1:1000", "akka.tcp://CollectorSystem@127.0.0.1:1001", "akka.tcp://CollectorSystem@127.0.0.1:1002"]
\ No newline at end of file
es.cluster.name=CollectorDBCluster
es.cluster.nodes=127.0.0.1:9300
es.cluster.transport.sniffer=true
es.index.shards.number=2
es.index.replicas.number=0
\ No newline at end of file
......@@ -2,7 +2,6 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.CollectorSystem;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.worker.httpserver.HttpServer;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.IndexCreator;
......@@ -18,8 +17,6 @@ public class CollectorBootStartUp {
public static void main(String[] args) throws Exception {
logger.info("collector system starting....");
ClusterConfigInitializer.initialize("collector.config");
CollectorSystem collectorSystem = new CollectorSystem();
collectorSystem.boot();
EsClient.INSTANCE.boot();
......
......@@ -3,6 +3,7 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
......@@ -24,7 +25,7 @@ public abstract class MergeAnalysisMember extends AnalysisMember {
final protected void setMergeData(String id, String column, String value) throws Exception {
getPersistenceData().getElseCreate(id).setMergeData(column, value);
if (getPersistenceData().size() >= WorkerConfig.Persistence.Data.size) {
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Analysis.size) {
aggregation();
}
}
......
......@@ -3,6 +3,7 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
......@@ -42,7 +43,7 @@ public abstract class MergePersistenceMember extends PersistenceMember {
if (message instanceof MergeData) {
MergeData mergeData = (MergeData) message;
getPersistenceData().getElseCreate(mergeData.getId()).merge(mergeData);
if (getPersistenceData().size() >= WorkerConfig.Persistence.Data.size) {
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Persistence.size) {
persistence();
}
} else {
......
......@@ -3,6 +3,7 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
......@@ -19,7 +20,7 @@ public abstract class MetricAnalysisMember extends AnalysisMember {
final protected void setMetric(String id, String column, Long value) throws Exception {
persistenceData.getElseCreate(id).setMetric(column, value);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
if (persistenceData.size() >= CacheSizeConfig.Cache.Persistence.size) {
aggregation();
}
}
......
......@@ -3,6 +3,7 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
......@@ -37,7 +38,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
if (message instanceof MetricData) {
MetricData metricData = (MetricData) message;
persistenceData.getElseCreate(metricData.getId()).merge(metricData);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
if (persistenceData.size() >= CacheSizeConfig.Cache.Persistence.size) {
persistence();
}
} else {
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class RecordAnalysisMember extends AnalysisMember {
private Logger logger = LogManager.getFormatterLogger(RecordAnalysisMember.class);
private RecordPersistenceData persistenceData = new RecordPersistenceData();
public RecordAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
......@@ -27,7 +21,7 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
final public void setRecord(String id, JsonObject record) throws Exception {
persistenceData.getElseCreate(id).setRecord(record);
if (persistenceData.size() >= WorkerConfig.Analysis.Data.size) {
if (persistenceData.size() >= CacheSizeConfig.Cache.Analysis.size) {
aggregation();
}
}
......
......@@ -3,6 +3,7 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
......@@ -39,7 +40,7 @@ public abstract class RecordPersistenceMember extends PersistenceMember {
RecordData recordData = (RecordData) message;
logger.debug("setRecord: id: %s, data: %s", recordData.getId(), recordData.getRecord());
getPersistenceData().getElseCreate(recordData.getId()).setRecord(recordData.getRecord());
if (getPersistenceData().size() >= WorkerConfig.Persistence.Data.size) {
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Persistence.size) {
persistence();
}
} else {
......
package com.a.eye.skywalking.collector.worker.config;
/**
* @author pengys5
*/
public class CacheSizeConfig {
public static class Cache {
public static class Analysis {
public static int size = 1000;
}
public static class Persistence {
public static int size = 1000;
}
}
}
package com.a.eye.skywalking.collector.worker.config;
import com.a.eye.skywalking.collector.config.ConfigProvider;
/**
* @author pengys5
*/
public class CacheSizeConfigProvider implements ConfigProvider {
@Override
public Class configClass() {
return CacheSizeConfig.class;
}
@Override
public void cliArgs() {
}
}
package com.a.eye.skywalking.collector.worker.config;
/**
* @author pengys5
*/
public class EsConfig {
public static class Es {
public static class Cluster {
public static String name = "";
public static String nodes = "";
public static class Transport {
public static String sniffer = "";
}
}
public static class Index {
public static class Shards {
public static String number;
}
public static class Replicas{
public static String number;
}
}
}
}
package com.a.eye.skywalking.collector.worker.config;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.config.ConfigProvider;
/**
* @author pengys5
*/
public class EsConfigProvider implements ConfigProvider {
@Override
public Class configClass() {
return EsConfig.class;
}
@Override
public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("es.cluster.name"))) {
EsConfig.Es.Cluster.name = System.getProperty("es.cluster.name");
}
if (!StringUtil.isEmpty(System.getProperty("es.cluster.nodes"))) {
EsConfig.Es.Cluster.nodes = System.getProperty("es.cluster.nodes");
}
if (!StringUtil.isEmpty(System.getProperty("es.cluster.transport.sniffer"))) {
EsConfig.Es.Cluster.Transport.sniffer = System.getProperty("es.cluster.transport.sniffer");
}
if (!StringUtil.isEmpty(System.getProperty("es.index.shards.number"))) {
EsConfig.Es.Index.Shards.number = System.getProperty("es.index.shards.number");
}
if (!StringUtil.isEmpty(System.getProperty("es.index.replicas.number"))) {
EsConfig.Es.Index.Replicas.number = System.getProperty("es.index.replicas.number");
}
}
}
package com.a.eye.skywalking.collector.worker.config;
/**
* @author pengys5
*/
public class HttpConfig {
public static class Http {
public static String hostname = "";
public static String port = "";
public static String contextPath = "";
}
}
package com.a.eye.skywalking.collector.worker.config;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.config.ConfigProvider;
/**
* @author pengys5
*/
public class HttpConfigProvider implements ConfigProvider {
@Override
public Class configClass() {
return HttpConfig.class;
}
@Override
public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("http.hostname"))) {
HttpConfig.Http.hostname = System.getProperty("http.hostname");
}
if (!StringUtil.isEmpty(System.getProperty("http.port"))) {
HttpConfig.Http.port = System.getProperty("http.port");
}
if (!StringUtil.isEmpty(System.getProperty("http.contextPath"))) {
HttpConfig.Http.contextPath = System.getProperty("http.contextPath");
}
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
package com.a.eye.skywalking.collector.worker.config;
/**
* @author pengys5
*/
public class WorkerConfig extends ClusterConfig {
public static class Analysis {
public static class Data {
public static int size = 1000;
}
}
public static class Persistence {
public static class Data {
public static int size = 1000;
}
}
public class WorkerConfig {
public static class WorkerNum {
public static class Node {
......@@ -105,18 +91,6 @@ public class WorkerConfig extends ClusterConfig {
public static int Size = 1024;
}
public static class NodeRefDayAnalysis {
public static int Size = 1024;
}
public static class NodeRefHourAnalysis {
public static int Size = 1024;
}
public static class NodeRefMinuteAnalysis {
public static int Size = 1024;
}
public static class NodeMappingDayAnalysis {
public static int Size = 1024;
}
......@@ -147,6 +121,18 @@ public class WorkerConfig extends ClusterConfig {
}
public static class NodeRef {
public static class NodeRefDayAnalysis {
public static int Size = 1024;
}
public static class NodeRefHourAnalysis {
public static int Size = 1024;
}
public static class NodeRefMinuteAnalysis {
public static int Size = 1024;
}
public static class NodeRefDaySave {
public static int Size = 1024;
}
......
package com.a.eye.skywalking.collector.worker.config;
import com.a.eye.skywalking.collector.config.ConfigProvider;
/**
* @author pengys5
*/
public class WorkerConfigProvider implements ConfigProvider {
@Override
public Class configClass() {
return WorkerConfig.class;
}
@Override
public void cliArgs() {
}
}
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MergeAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......
......@@ -7,7 +7,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MergePersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
/**
......
......@@ -15,7 +15,7 @@ public abstract class AbstractHttpServlet extends HttpServlet {
final public void reply(HttpServletResponse response, JsonObject resJson, int status) throws IOException {
response.setContentType("text/json");
response.setCharacterEncoding("utf-8");
response.setStatus(HttpServletResponse.SC_OK);
response.setStatus(status);
PrintWriter out = response.getWriter();
out.print(resJson);
......
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -15,6 +14,7 @@ import java.io.IOException;
/**
* @author pengys5
*/
public abstract class AbstractPost extends AbstractLocalAsyncWorker {
private Logger logger = LogManager.getFormatterLogger(AbstractPost.class);
......@@ -27,10 +27,9 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
final public void onWork(Object request) throws Exception {
if (request instanceof String) {
onReceive((String) request);
} else if (request instanceof EndOfBatchCommand) {
} else {
logger.error("unhandled request, request instance must String, but is %s", request.getClass().toString());
saveException(new IllegalArgumentException("request instance must String"));
}
}
......@@ -40,7 +39,7 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
private final LocalAsyncWorkerRef ownerWorkerRef;
protected PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
......@@ -49,7 +48,7 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
JsonObject resJson = new JsonObject();
try {
BufferedReader bufferedReader = request.getReader();
StringBuffer dataStr = new StringBuffer();
StringBuilder dataStr = new StringBuilder();
String tmpStr;
while ((tmpStr = bufferedReader.readLine()) != null) {
dataStr.append(tmpStr);
......
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.worker.config.HttpConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import java.net.InetSocketAddress;
/**
* @author pengys5
*/
......@@ -16,9 +18,9 @@ public enum HttpServer {
private Logger logger = LogManager.getFormatterLogger(HttpServer.class);
public void boot(ClusterWorkerContext clusterContext) throws Exception {
Server server = new Server(7001);
Server server = new Server(new InetSocketAddress(HttpConfig.Http.hostname, Integer.valueOf(HttpConfig.Http.port)));
String contextPath = "/";
String contextPath = HttpConfig.Http.contextPath;
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath);
logger.info("http server root context path: %s", contextPath);
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeCompAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingDayAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingHourAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingMinuteAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
/**
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
/**
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
/**
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
/**
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
/**
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
/**
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
/**
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
/**
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -70,7 +70,7 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeRefDayAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefDayAnalysis.Size;
}
}
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -70,7 +70,7 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeRefHourAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefHourAnalysis.Size;
}
}
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -70,7 +70,7 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeRefMinuteAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefMinuteAnalysis.Size;
}
}
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumDayAgg;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumHourAgg;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumMinuteAgg;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
/**
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
/**
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
/**
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
/**
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
/**
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.noderef.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumIndex;
/**
......
......@@ -7,7 +7,7 @@ import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPost;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPostProvider;
......
......@@ -7,7 +7,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......
......@@ -7,7 +7,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
......
......@@ -7,7 +7,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.worker.config.EsConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
......@@ -32,8 +33,8 @@ public abstract class AbstractIndex {
final XContentBuilder createSettingBuilder() throws IOException {
return XContentFactory.jsonBuilder()
.startObject()
.field("index.number_of_shards", 2)
.field("index.number_of_replicas", 0)
.field("index.number_of_shards", EsConfig.Es.Index.Shards.number)
.field("index.number_of_replicas", EsConfig.Es.Index.Replicas.number)
.endObject();
}
......
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.worker.config.EsConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
......@@ -11,6 +12,8 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
......@@ -18,16 +21,22 @@ import java.net.UnknownHostException;
public enum EsClient {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(EsClient.class);
private Client client;
public void boot() throws UnknownHostException {
Settings settings = Settings.builder()
.put("cluster.name", "CollectorCluster")
.put("client.transport.sniff", true)
.put("cluster.name", EsConfig.Es.Cluster.name)
.put("client.transport.sniff", EsConfig.Es.Cluster.Transport.sniffer)
.build();
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
client = new PreBuiltTransportClient(settings);
List<AddressPairs> pairsList = parseClusterNodes(EsConfig.Es.Cluster.nodes);
for (AddressPairs pairs : pairsList) {
((PreBuiltTransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pairs.ip), pairs.port));
}
}
public Client getClient() {
......@@ -35,7 +44,6 @@ public enum EsClient {
}
public void indexRefresh(String... indexName) {
Logger logger = LogManager.getFormatterLogger(EsClient.class);
RefreshResponse response = client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
if (response.getShardFailures().length == response.getTotalShards()) {
logger.error("All elasticsearch shard index refresh failure, reason: %s", response.getShardFailures());
......@@ -44,4 +52,28 @@ public enum EsClient {
}
logger.info("elasticsearch index refresh success");
}
private List<AddressPairs> parseClusterNodes(String nodes) {
List<AddressPairs> pairsList = new ArrayList<>();
logger.info("es nodes: %s", nodes);
String[] nodesSplit = nodes.split(",");
for (int i = 0; i < nodesSplit.length; i++) {
String node = nodesSplit[i];
String ip = node.split(":")[0];
String port = node.split(":")[1];
pairsList.add(new AddressPairs(ip, Integer.valueOf(port)));
}
return pairsList;
}
class AddressPairs {
private String ip;
private Integer port;
public AddressPairs(String ip, Integer port) {
this.ip = ip;
this.port = port;
}
}
}
com.a.eye.skywalking.collector.cluster.ClusterConfigProvider
com.a.eye.skywalking.collector.worker.config.EsConfigProvider
com.a.eye.skywalking.collector.worker.config.HttpConfigProvider
com.a.eye.skywalking.collector.worker.config.CacheSizeConfigProvider
com.a.eye.skywalking.collector.worker.config.WorkerConfigProvider
\ No newline at end of file
com.a.eye.skywalking.collector.worker.noderef.NodeRefGetWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumGetWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.noderef.NodeRefResSumGetGroupWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.segment.SegmentTopGetWithTimeSlice$Factory
com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceGetWithGlobalId$Factory
......
cluster.current.hostname=127.0.0.1
cluster.current.port=1000
cluster.current.roles=[WorkersListener]
#cluster.seed_nodes=["akka.tcp://CollectorSystem@127.0.0.1:1000"]
cluster.current.roles=WorkersListener
cluster.seed_nodes=127.0.0.1:1000,127.0.0.1:1001
es.cluster.name=CollectorDBCluster
es.cluster.nodes=127.0.0.1:9300
es.cluster.transport.sniffer=true
es.index.shards.number=2
es.index.replicas.number=0
http.hostname=127.0.0.1
http.port=7001
http.contextPath=/
cache.analysis.size=1024
cache.persistence.size=1024
WorkerNum.Node.NodeCompAgg.Value=10
WorkerNum.Node.NodeMappingDayAgg.Value=10
WorkerNum.Node.NodeMappingHourAgg.Value=10
WorkerNum.Node.NodeMappingMinuteAgg.Value=10
WorkerNum.NodeRef.NodeRefDayAgg.Value=10
WorkerNum.NodeRef.NodeRefHourAgg.Value=10
WorkerNum.NodeRef.NodeRefMinuteAgg.Value=10
WorkerNum.NodeRef.NodeRefResSumDayAgg.Value=10
WorkerNum.NodeRef.NodeRefResSumHourAgg.Value=10
WorkerNum.NodeRef.NodeRefResSumMinuteAgg.Value=10
WorkerNum.GlobalTrace.GlobalTraceAgg.Value=10
Queue.GlobalTrace.GlobalTraceSave.Size=1024
Queue.GlobalTrace.GlobalTraceAnalysis.Size=1024
Queue.Segment.SegmentPost.Size=1024
Queue.Segment.SegmentCostSave.Size=1024
Queue.Segment.SegmentSave.Size=1024
Queue.Segment.SegmentExceptionSave.Size=1024
Queue.Node.NodeCompAnalysis.Size=1024
Queue.Node.NodeMappingDayAnalysis.Size=1024
Queue.Node.NodeMappingHourAnalysis.Size=1024
Queue.Node.NodeMappingMinuteAnalysis.Size=1024
Queue.Node.NodeCompSave.Size=1024
Queue.Node.NodeMappingDaySave.Size=1024
Queue.Node.NodeMappingHourSave.Size=1024
Queue.Node.NodeMappingMinuteSave.Size=1024
Queue.NodeRef.NodeRefDayAnalysis.Size=1024
Queue.NodeRef.NodeRefHourAnalysis.Size=1024
Queue.NodeRef.NodeRefMinuteAnalysis.Size=1024
Queue.NodeRef.NodeRefDaySave.Size=1024
Queue.NodeRef.NodeRefHourSave.Size=1024
Queue.NodeRef.NodeRefMinuteSave.Size=1024
Queue.NodeRef.NodeRefResSumDaySave.Size=1024
Queue.NodeRef.NodeRefResSumHourSave.Size=1024
Queue.NodeRef.NodeRefResSumMinuteSave.Size=1024
Queue.NodeRef.NodeRefResSumDayAnalysis.Size=1024
Queue.NodeRef.NodeRefResSumHourAnalysis.Size=1024
Queue.NodeRef.NodeRefResSumMinuteAnalysis.Size=1024
es.cluster.name=
es.cluster.nodes=
......@@ -2,6 +2,7 @@ package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
import org.junit.Assert;
......@@ -44,7 +45,7 @@ public class MergeAnalysisMemberTestCase {
@Test
public void testSetMergeDataNotFull() throws Exception {
when(persistenceData.size()).thenReturn(WorkerConfig.Persistence.Data.size - 1);
when(persistenceData.size()).thenReturn(CacheSizeConfig.Cache.Analysis.size - 1);
mergeAnalysisMember.setMergeData("segment_1", "column", "value");
Mockito.verify(mergeAnalysisMember, Mockito.never()).aggregation();
......@@ -52,7 +53,7 @@ public class MergeAnalysisMemberTestCase {
@Test
public void testSetMergeDataFull() throws Exception {
when(persistenceData.size()).thenReturn(WorkerConfig.Persistence.Data.size);
when(persistenceData.size()).thenReturn(CacheSizeConfig.Cache.Analysis.size);
mergeAnalysisMember.setMergeData("segment_1", "column", "value");
Mockito.verify(mergeAnalysisMember, Mockito.times(1)).aggregation();
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
/**
* @author pengys5
*/
public class StartUpTestCase {
public void test() throws Exception {
System.out.println(SegmentPost.class.getSimpleName());
ClusterConfigInitializer.initialize("collector.config");
System.out.println(ClusterConfig.Cluster.Current.roles);
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes)).
withFallback(ConfigFactory.load("application.conf"));
ActorSystem system = ActorSystem.create("CollectorSystem", config);
EsClient.INSTANCE.boot();
TraceSegment dubboClientData = TraceSegmentBuilderFactory.INSTANCE.traceOf_Tomcat_DubboClient();
// SegmentMessage.Builder clientBuilder = dubboClientData.serialize().toBuilder();
// clientBuilder.setApplicationCode("Tomcat_DubboClient");
//
// dubboClientData = new TraceSegment(clientBuilder.build());
//
// TraceSegment dubboServerData = TraceSegmentBuilderFactory.INSTANCE.traceOf_DubboServer_MySQL();
//
// SegmentMessage serializeServer = dubboServerData.serialize();
// SegmentMessage.Builder builder = serializeServer.toBuilder();
//
// SegmentRefMessage.Builder builderRef = builder.getRefs(0).toBuilder();
// builderRef.setApplicationCode(dubboClientData.getApplicationCode());
//
//
// builderRef.setPeerHost(Tags.PEER_HOST.get(dubboClientData.getSpans().get(1)));
//
// builder.setApplicationCode("DubboServer_MySQL");
// builder.addRefs(builderRef);
// dubboServerData = new TraceSegment(builder.build());
Thread.sleep(5000);
ActorSelection selection = system.actorSelection("/user/TraceSegmentReceiver_1");
for (int i = 0; i < 100; i++) {
selection.tell(dubboClientData, ActorRef.noSender());
// selection.tell(dubboServerData, ActorRef.noSender());
Thread.sleep(200);
}
Thread.sleep(1000000);
}
}
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceAgg;
import com.a.eye.skywalking.collector.worker.mock.MergeDataAnswer;
import com.a.eye.skywalking.collector.worker.segment.mock.SegmentMock;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.MergeDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.MergeDataAggTools;
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import org.junit.Assert;
import org.junit.Before;
......
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.Test;
import org.mockito.Mockito;
/**
* @author pengys5
*/
public class AbstractGetProviderTestCase {
@Test
public void testCreate() throws IllegalArgumentException, ProviderNotFoundException {
ServletContextHandler handler = Mockito.mock(ServletContextHandler.class);
TestAbstractGetProvider provider = new TestAbstractGetProvider();
provider.create(handler);
Mockito.verify(handler).addServlet(Mockito.any(ServletHolder.class), Mockito.eq("servletPath"));
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
public class AbstractGetTestCase {
private TestAbstractGet get;
@Before
public void init() {
get = mock(TestAbstractGet.class);
}
@Test
public void testOnWork() throws Exception {
Map<String, String[]> parameterMap = new HashMap<>();
JsonObject response = new JsonObject();
get.onWork(parameterMap, response);
verify(get).onSearch(any(Map.class), any(JsonObject.class));
}
@Test
public void testOnWorkError() throws Exception {
Map<String, String[]> parameterMap = new HashMap<>();
JsonObject response = new JsonObject();
doThrow(new Exception("testOnWorkError")).when(get).onSearch(any(Map.class), any(JsonObject.class));
get.onWork(parameterMap, response);
Assert.assertEquals(false, response.get("isSuccess").getAsBoolean());
Assert.assertEquals("testOnWorkError", response.get("reason").getAsString());
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.Test;
import org.mockito.Mockito;
/**
* @author pengys5
*/
public class AbstractPostProviderTestCase {
@Test
public void testCreate() throws IllegalArgumentException, ProviderNotFoundException {
ServletContextHandler handler = Mockito.mock(ServletContextHandler.class);
TestAbstractPostProvider provider = new TestAbstractPostProvider();
provider.create(handler);
Mockito.verify(handler).addServlet(Mockito.any(ServletHolder.class), Mockito.eq("testPost"));
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.google.gson.JsonObject;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({TestAbstractPost.class})
@PowerMockIgnore({"javax.management.*"})
public class AbstractPostTestCase {
private TestAbstractPost post;
@Before
public void init() {
ClusterWorkerContext clusterWorkerContext = mock(ClusterWorkerContext.class);
LocalWorkerContext localWorkerContext = mock(LocalWorkerContext.class);
post = spy(new TestAbstractPost(TestAbstractPost.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext));
}
@Test
public void testOnWork() throws Exception {
String request = "testOnWork";
post.onWork(request);
verify(post).onReceive(anyString());
}
@Test
public void testOnWorkError() throws Exception {
post.onWork(new JsonObject());
PowerMockito.verifyPrivate(post).invoke("saveException", any(IllegalArgumentException.class));
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.LocalSyncWorkerRef;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
public class GetWithHttpServletTestCase {
@Test
public void testDoGet() throws IOException, ServletException {
LocalSyncWorkerRef workerRef = mock(LocalSyncWorkerRef.class);
AbstractGet.GetWithHttpServlet servlet = new AbstractGet.GetWithHttpServlet(workerRef);
HttpServletRequest request = mock(HttpServletRequest.class);
HttpServletResponse response = mock(HttpServletResponse.class);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Integer status = (Integer) invocation.getArguments()[0];
System.out.println(status);
Assert.assertEquals(new Integer(200), status);
return null;
}
}).when(response).setStatus(anyInt());
PrintWriter writer = mock(PrintWriter.class);
when(response.getWriter()).thenReturn(writer);
servlet.doGet(request, response);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.LocalAsyncWorkerRef;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.PrintWriter;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
public class PostWithHttpServletTestCase {
private LocalAsyncWorkerRef workerRef;
private AbstractPost.PostWithHttpServlet servlet;
private HttpServletRequest request;
private HttpServletResponse response;
private PrintWriter writer;
@Before
public void init() throws Exception {
workerRef = mock(LocalAsyncWorkerRef.class);
servlet = new AbstractPost.PostWithHttpServlet(workerRef);
request = mock(HttpServletRequest.class);
response = mock(HttpServletResponse.class);
writer = mock(PrintWriter.class);
when(response.getWriter()).thenReturn(writer);
}
@Test
public void testDoPost() throws Exception {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Integer status = (Integer) invocation.getArguments()[0];
System.out.println(status);
Assert.assertEquals(new Integer(200), status);
return null;
}
}).when(response).setStatus(anyInt());
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
String reqStr = (String) invocation.getArguments()[0];
System.out.println(reqStr);
Assert.assertEquals("TestTest2", reqStr);
return null;
}
}).when(workerRef).tell(anyString());
BufferedReader bufferedReader = mock(BufferedReader.class);
when(bufferedReader.readLine()).thenReturn("Test").thenReturn("Test2").thenReturn(null);
when(request.getReader()).thenReturn(bufferedReader);
servlet.doPost(request, response);
}
@Test
public void testDoPostError() throws Exception {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Integer status = (Integer) invocation.getArguments()[0];
System.out.println(status);
Assert.assertEquals(new Integer(500), status);
return null;
}
}).when(response).setStatus(anyInt());
doThrow(new Exception()).when(workerRef).tell(anyString());
servlet.doPost(request, response);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.google.gson.JsonObject;
import java.util.Map;
/**
* @author pengys5
*/
public class TestAbstractGet extends AbstractGet {
protected TestAbstractGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
}
public static class Factory extends AbstractGetProvider<TestAbstractGet> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return TestAbstractGet.WorkerRole.INSTANCE;
}
@Override
public TestAbstractGet workerInstance(ClusterWorkerContext clusterContext) {
return new TestAbstractGet(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/TestAbstractGet";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestAbstractGet.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
/**
* @author pengys5
*/
public class TestAbstractGetProvider extends AbstractGetProvider {
@Override
public String servletPath() {
return "servletPath";
}
@Override
public Role role() {
return null;
}
@Override
public AbstractWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestAbstractGet(null, null, null);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestAbstractPost extends AbstractPost {
public TestAbstractPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override
protected void onReceive(String reqJsonStr) throws Exception {
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestAbstractPost.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
public static class Factory extends AbstractPostProvider<TestAbstractPost> {
public static Factory INSTANCE = new Factory();
@Override
public String servletPath() {
return "/TestAbstractPost";
}
@Override
public int queueSize() {
return 4;
}
@Override
public Role role() {
return TestAbstractPost.WorkerRole.INSTANCE;
}
@Override
public TestAbstractPost workerInstance(ClusterWorkerContext clusterContext) {
return new TestAbstractPost(role(), clusterContext, new LocalWorkerContext());
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
/**
* @author pengys5
*/
public class TestAbstractPostProvider extends AbstractPostProvider {
@Override
public int queueSize() {
return 4;
}
@Override
public String servletPath() {
return "testPost";
}
@Override
public Role role() {
return null;
}
@Override
public AbstractWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestAbstractPost(null, null, null);
}
}
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.datamerge.RecordDataMergeJson;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeCompAgg;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.datamerge.RecordDataMergeJson;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingDayAgg;
......
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.datamerge.RecordDataMergeJson;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingHourAgg;
......
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.datamerge.RecordDataMergeJson;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingMinuteAgg;
......
......@@ -5,12 +5,10 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
import com.a.eye.skywalking.collector.worker.tools.RecordDataTool;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......
......@@ -3,9 +3,8 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import org.junit.Assert;
import org.junit.Before;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import org.junit.Assert;
import org.junit.Before;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.RecordDataAggTools;
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import org.junit.Assert;
import org.junit.Before;
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -72,7 +72,7 @@ public class NodeRefDayAnalysisTestCase {
Assert.assertEquals(NodeRefDayAnalysis.class.getSimpleName(), NodeRefDayAnalysis.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.Node.NodeRefDayAnalysis.Size = testSize;
WorkerConfig.Queue.NodeRef.NodeRefDayAnalysis.Size = testSize;
Assert.assertEquals(testSize, NodeRefDayAnalysis.Factory.INSTANCE.queueSize());
}
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -71,7 +71,7 @@ public class NodeRefHourAnalysisTestCase {
Assert.assertEquals(NodeRefHourAnalysis.class.getSimpleName(), NodeRefHourAnalysis.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.Node.NodeRefHourAnalysis.Size = testSize;
WorkerConfig.Queue.NodeRef.NodeRefHourAnalysis.Size = testSize;
Assert.assertEquals(testSize, NodeRefHourAnalysis.Factory.INSTANCE.queueSize());
}
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.RecordDataAnswer;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -72,7 +72,7 @@ public class NodeRefMinuteAnalysisTestCase {
Assert.assertEquals(NodeRefMinuteAnalysis.class.getSimpleName(), NodeRefMinuteAnalysis.Factory.INSTANCE.workerInstance(null).getClass().getSimpleName());
int testSize = 10;
WorkerConfig.Queue.Node.NodeRefMinuteAnalysis.Size = testSize;
WorkerConfig.Queue.NodeRef.NodeRefMinuteAnalysis.Size = testSize;
Assert.assertEquals(testSize, NodeRefMinuteAnalysis.Factory.INSTANCE.queueSize());
}
......
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.mock.MetricDataAnswer;
import com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefResSumDayAgg;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册