diff --git a/skywalking-sniffer/skywalking-agent/src/main/java/com/a/eye/skywalking/agent/SkyWalkingAgent.java b/skywalking-sniffer/skywalking-agent/src/main/java/com/a/eye/skywalking/agent/SkyWalkingAgent.java index ea6ba500e33634891355fb909aa475f79c0f0a09..d2f2a659c169ed37e15cfce9ef606f50535b2d82 100644 --- a/skywalking-sniffer/skywalking-agent/src/main/java/com/a/eye/skywalking/agent/SkyWalkingAgent.java +++ b/skywalking-sniffer/skywalking-agent/src/main/java/com/a/eye/skywalking/agent/SkyWalkingAgent.java @@ -99,8 +99,8 @@ public class SkyWalkingAgent { private static void initConfig() { - Config.SkyWalking.IS_PREMAIN_MODE = true; - Config.SkyWalking.AGENT_BASE_PATH = initAgentBasePath(); + Config.Agent.IS_PREMAIN_MODE = true; + Config.Agent.PATH = initAgentBasePath(); SnifferConfigInitializer.initialize(); } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/CollectorClient.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/CollectorClient.java index 63e6a8b9484dc578363ce5e5a76590bc85ca9534..b77f956255ffd78d0171e49f41a8befb74e13ccf 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/CollectorClient.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/CollectorClient.java @@ -1,11 +1,21 @@ package com.a.eye.skywalking.api.client; import com.a.eye.skywalking.api.boot.ServiceManager; +import com.a.eye.skywalking.api.conf.Config; import com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue; import com.a.eye.skywalking.logging.ILog; import com.a.eye.skywalking.logging.LogManager; +import com.a.eye.skywalking.trace.SegmentsMessage; import com.a.eye.skywalking.trace.TraceSegment; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.io.IOException; import java.util.List; +import java.util.Random; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; import org.apache.http.impl.client.HttpClients; @@ -21,11 +31,18 @@ public class CollectorClient implements Runnable { private static ILog logger = LogManager.getLogger(CollectorClient.class); private static long SLEEP_TIME_MILLIS = 500; private CloseableHttpClient httpclient; + private String[] serverList; + private volatile int selectedServer = -1; public CollectorClient() { + serverList = Config.Collector.SERVERS.split(","); httpclient = HttpClients.custom() .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy()) .build(); + Random r = new Random(); + if (serverList.length > 0) { + selectedServer = r.nextInt(serverList.length); + } } @Override @@ -36,13 +53,19 @@ public class CollectorClient implements Runnable { TraceSegmentProcessQueue segmentProcessQueue = ServiceManager.INSTANCE.findService(TraceSegmentProcessQueue.class); List cachedTraceSegments = segmentProcessQueue.getCachedTraceSegments(); if (cachedTraceSegments.size() > 0) { + SegmentsMessage message = null; + int count = 0; for (TraceSegment segment : cachedTraceSegments) { - /** - * No receiver found, means collector server is off-line. - */ - sleepTime = SLEEP_TIME_MILLIS * 10; - break; + if (message == null) { + message = new SegmentsMessage(); + } + message.append(segment); + if (count == Config.Collector.BATCH_SIZE) { + sendToCollector(message); + message = null; + } } + sendToCollector(message); } else { sleepTime = SLEEP_TIME_MILLIS; } @@ -56,6 +79,64 @@ public class CollectorClient implements Runnable { } } + /** + * Send the given {@link SegmentsMessage} to collector. + * + * @param message to be send. + */ + private void sendToCollector(SegmentsMessage message) throws RESTResponseStatusError, IOException { + if (message == null) { + return; + } + Gson gson = new GsonBuilder() + .excludeFieldsWithoutExposeAnnotation() + .create(); + String messageJson = gson.toJson(message); + + try { + HttpPost httpPost = ready2Send(messageJson); + if (httpPost != null) { + CloseableHttpResponse httpResponse = httpclient.execute(httpPost); + int statusCode = httpResponse.getStatusLine().getStatusCode(); + if (200 != statusCode) { + findBackupServer(); + throw new RESTResponseStatusError(statusCode); + } + } + } catch (IOException e) { + findBackupServer(); + throw e; + } + } + + /** + * Prepare the given message for HTTP Post service. + * + * @param messageJson to send + * @return {@link HttpPost}, when is ready to send. otherwise, null. + */ + private HttpPost ready2Send(String messageJson) { + if (selectedServer == -1) { + //no available server + return null; + } + HttpPost post = new HttpPost("http://" + serverList[selectedServer] + Config.Collector.SERVICE_NAME); + StringEntity entity = new StringEntity(messageJson, ContentType.APPLICATION_JSON); + post.setEntity(entity); + + return post; + } + + /** + * Choose the next server in {@link #serverList}, by moving {@link #selectedServer}. + */ + private void findBackupServer() { + selectedServer++; + if (selectedServer == serverList.length) { + selectedServer = 0; + } + } + /** * Try to sleep, and ignore the {@link InterruptedException} * diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/RESTResponseStatusError.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/RESTResponseStatusError.java new file mode 100644 index 0000000000000000000000000000000000000000..db74873bdcf9c210e3a315f4fedb4a30cf6869cc --- /dev/null +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/RESTResponseStatusError.java @@ -0,0 +1,13 @@ +package com.a.eye.skywalking.api.client; + +/** + * The RESTResponseStatusError represents the REST-Service client got an unexpected response code. + * Most likely, the response code is not 200. + * + * @author wusheng + */ +class RESTResponseStatusError extends Exception { + RESTResponseStatusError(int responseCode){ + super("Unexpected service response code: " + responseCode); + } +} diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/Config.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/Config.java index e2cfae6d084d437a3ac389b7edb1744ca423a7ce..d13ad75e3db97dc92f2d511f347a839fa43c9bb6 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/Config.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/Config.java @@ -2,20 +2,24 @@ package com.a.eye.skywalking.api.conf; public class Config { - public static class SkyWalking { + public static class Agent { public static String APPLICATION_CODE = ""; public static boolean IS_PREMAIN_MODE = false; - public static String AGENT_BASE_PATH = ""; + public static String PATH = ""; + } + public static class Collector{ public static String SERVERS = ""; public static String SERVICE_NAME = "/segments"; + + public static int BATCH_SIZE = 50; } - public static class Disruptor{ - public static int BUFFER_SIZE = 512; + public static class Buffer { + public static int SIZE = 512; } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializer.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializer.java index 6b63b2a446c2678b3790f1fc70e88f4946ae7e4d..9c9fd2f17519706e87a536f81c52c0deb09b65a8 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializer.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializer.java @@ -15,7 +15,7 @@ public class SnifferConfigInitializer { public static void initialize() { InputStream configFileStream; - if (Config.SkyWalking.IS_PREMAIN_MODE) { + if (Config.Agent.IS_PREMAIN_MODE) { configFileStream = fetchAuthFileInputStream(); } else { configFileStream = SnifferConfigInitializer.class.getResourceAsStream("/sky-walking.config"); @@ -35,24 +35,24 @@ public class SnifferConfigInitializer { String applicationCode = System.getProperty("applicationCode"); if (!StringUtil.isEmpty(applicationCode)) { - Config.SkyWalking.APPLICATION_CODE = applicationCode; + Config.Agent.APPLICATION_CODE = applicationCode; } String servers = System.getProperty("servers"); if(!StringUtil.isEmpty(servers)) { - Config.SkyWalking.SERVERS = servers; + Config.Collector.SERVERS = servers; } - if (StringUtil.isEmpty(Config.SkyWalking.APPLICATION_CODE)) { + if (StringUtil.isEmpty(Config.Agent.APPLICATION_CODE)) { throw new ExceptionInInitializerError("'-DapplicationCode=' is missing."); } - if (StringUtil.isEmpty(Config.SkyWalking.SERVERS)) { + if (StringUtil.isEmpty(Config.Collector.SERVERS)) { throw new ExceptionInInitializerError("'-Dservers=' is missing."); } } private static InputStream fetchAuthFileInputStream() { try { - return new FileInputStream(Config.SkyWalking.AGENT_BASE_PATH + File.separator + "sky-walking.config"); + return new FileInputStream(Config.Agent.PATH + File.separator + "sky-walking.config"); } catch (Exception e) { logger.warn("sky-walking.config is missing, use default config."); return null; diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java index e484e0d4d3dcccc2a4d5d6e012794dd4b5fbec2c..6229bd10ec5d08b21bae2ff4a081141fa2af9f4a 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java @@ -34,7 +34,7 @@ public final class TracerContext { * Create a {@link TraceSegment} and init {@link #spanIdGenerator} as 0; */ TracerContext() { - this.segment = new TraceSegment(Config.SkyWalking.APPLICATION_CODE); + this.segment = new TraceSegment(Config.Agent.APPLICATION_CODE); this.spanIdGenerator = 0; } @@ -123,7 +123,7 @@ public final class TracerContext { public void inject(ContextCarrier carrier) { carrier.setTraceSegmentId(this.segment.getTraceSegmentId()); carrier.setSpanId(this.activeSpan().getSpanId()); - carrier.setApplicationCode(Config.SkyWalking.APPLICATION_CODE); + carrier.setApplicationCode(Config.Agent.APPLICATION_CODE); carrier.setPeerHost(Tags.PEER_HOST.get(activeSpan())); carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces()); } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/SyncFileWriter.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/SyncFileWriter.java index 99d0811434b83f7bf27af5752cfcecf9b101c5a3..72b305f2b1abca1a1871425e5fbe98e3844039c6 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/SyncFileWriter.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/SyncFileWriter.java @@ -19,7 +19,7 @@ public class SyncFileWriter implements IWriter { private SyncFileWriter() { try { - File logFilePath = new File(Config.SkyWalking.AGENT_BASE_PATH, Config.Logging.LOG_DIR_NAME); + File logFilePath = new File(Config.Agent.PATH, Config.Logging.LOG_DIR_NAME); if (!logFilePath.exists()) { logFilePath.mkdirs(); } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/WriterFactory.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/WriterFactory.java index fc38edfeebf34ccb3b7052e02834835690cde687..6e5bd423001f3a0cd61036fc47893c7bb06604df 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/WriterFactory.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/WriterFactory.java @@ -4,7 +4,7 @@ import com.a.eye.skywalking.api.conf.Config; public class WriterFactory { public static IWriter getLogWriter(){ - if (Config.SkyWalking.IS_PREMAIN_MODE){ + if (Config.Agent.IS_PREMAIN_MODE){ return SyncFileWriter.instance(); }else{ return new STDOutWriter(); diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/queue/TraceSegmentProcessQueue.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/queue/TraceSegmentProcessQueue.java index d868de9f4016e9a34696be8d2087972af211a039..9d25f94ff39bd28e1a96e726ae43abfc3711d6fe 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/queue/TraceSegmentProcessQueue.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/queue/TraceSegmentProcessQueue.java @@ -30,8 +30,8 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace private volatile int cacheIndex; public TraceSegmentProcessQueue() { - disruptor = new Disruptor(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE); - secondLevelCache = new TraceSegment[Config.Disruptor.BUFFER_SIZE]; + disruptor = new Disruptor(TraceSegmentHolder.Factory.INSTANCE, Config.Buffer.SIZE, DaemonThreadFactory.INSTANCE); + secondLevelCache = new TraceSegment[Config.Buffer.SIZE]; cacheIndex = 0; disruptor.handleEventsWith(this); buffer = disruptor.getRingBuffer(); diff --git a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializerTest.java b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializerTest.java index 4804c74ba7336e5e52f64980bf1c8977f6b4bec1..befd85f85a9cfd8b37b3956a4200d60231f29eec 100644 --- a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializerTest.java +++ b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializerTest.java @@ -11,13 +11,13 @@ public class SnifferConfigInitializerTest { @Test public void testInitialize(){ - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; SnifferConfigInitializer.initialize(); - Assert.assertEquals("crmApp", Config.SkyWalking.APPLICATION_CODE); - Assert.assertEquals("127.0.0.1:8080", Config.SkyWalking.SERVERS); + Assert.assertEquals("crmApp", Config.Agent.APPLICATION_CODE); + Assert.assertEquals("127.0.0.1:8080", Config.Collector.SERVERS); - Assert.assertNotNull(Config.Disruptor.BUFFER_SIZE); + Assert.assertNotNull(Config.Buffer.SIZE); Assert.assertNotNull(Config.Logging.LOG_DIR_NAME); Assert.assertNotNull(Config.Logging.LOG_FILE_NAME); Assert.assertNotNull(Config.Logging.MAX_LOG_FILE_LENGTH); @@ -26,12 +26,12 @@ public class SnifferConfigInitializerTest { @Test(expected = ExceptionInInitializerError.class) public void testErrorInitialize(){ - Config.SkyWalking.IS_PREMAIN_MODE = true; + Config.Agent.IS_PREMAIN_MODE = true; SnifferConfigInitializer.initialize(); } @AfterClass public static void reset(){ - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; } } diff --git a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/EasyLoggerTest.java b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/EasyLoggerTest.java index 0e6cd274c38d5d0e06d4db452c7225e7983dd9ed..854798abc26d7dbee8cbcd3549cace8e7e0bff80 100644 --- a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/EasyLoggerTest.java +++ b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/EasyLoggerTest.java @@ -26,7 +26,7 @@ public class EasyLoggerTest { @Test public void testLog(){ - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; PrintStream output = Mockito.mock(PrintStream.class); System.setOut(output); diff --git a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/WriterFactoryTest.java b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/WriterFactoryTest.java index eec580928a95edccdff1539400afb73333bf1e27..d7adefd40a35a3f76c11d80714f126c8df9d11b5 100644 --- a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/WriterFactoryTest.java +++ b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/WriterFactoryTest.java @@ -25,18 +25,18 @@ public class WriterFactoryTest { */ @Test public void testGetLogWriter(){ - Config.SkyWalking.IS_PREMAIN_MODE = true; + Config.Agent.IS_PREMAIN_MODE = true; PrintStream mockStream = Mockito.mock(PrintStream.class); System.setErr(mockStream); Assert.assertEquals(SyncFileWriter.instance(), WriterFactory.getLogWriter()); - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; Assert.assertTrue(WriterFactory.getLogWriter() instanceof STDOutWriter); } @AfterClass public static void reset(){ - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; System.setErr(errRef); } } diff --git a/skywalking-sniffer/skywalking-api/src/test/resources/sky-walking.config b/skywalking-sniffer/skywalking-api/src/test/resources/sky-walking.config index 7817dd446d1a0019ad890d51119de05ea12c6360..b563dfb113ac14ca86e64d2f3031a53a3537f396 100644 --- a/skywalking-sniffer/skywalking-api/src/test/resources/sky-walking.config +++ b/skywalking-sniffer/skywalking-api/src/test/resources/sky-walking.config @@ -1,2 +1,2 @@ -skywalking.application_code = crmApp -skywalking.servers = 127.0.0.1:8080 +agent.application_code = crmApp +collector.servers = 127.0.0.1:8080 diff --git a/skywalking-sniffer/skywalking-sdk-plugin/dubbo-plugin/src/test/java/com/a/eye/skywalking/plugin/dubbo/DubboInterceptorTest.java b/skywalking-sniffer/skywalking-sdk-plugin/dubbo-plugin/src/test/java/com/a/eye/skywalking/plugin/dubbo/DubboInterceptorTest.java index 4e479b0392c23c9b73a0fb106424a655b1269a54..9f03be8bfebc3ade8e4a1208e02385a3048ea1cc 100644 --- a/skywalking-sniffer/skywalking-sdk-plugin/dubbo-plugin/src/test/java/com/a/eye/skywalking/plugin/dubbo/DubboInterceptorTest.java +++ b/skywalking-sniffer/skywalking-sdk-plugin/dubbo-plugin/src/test/java/com/a/eye/skywalking/plugin/dubbo/DubboInterceptorTest.java @@ -80,7 +80,7 @@ public class DubboInterceptorTest { Mockito.when(RpcContext.getContext()).thenReturn(rpcContext); when(rpcContext.isConsumerSide()).thenReturn(true); when(methodInvokeContext.allArguments()).thenReturn(new Object[]{invoker, invocation}); - Config.SkyWalking.APPLICATION_CODE = "DubboTestCases-APP"; + Config.Agent.APPLICATION_CODE = "DubboTestCases-APP"; }