提交 baf9bf69 编写于 作者: wu-sheng's avatar wu-sheng

Finish CollectorClient code. It is almost ready to send TraceSegement (s) to...

Finish CollectorClient code. It is almost ready to send TraceSegement (s) to collector in batch/fail-switch mode.
上级 4940a3c8
......@@ -99,8 +99,8 @@ public class SkyWalkingAgent {
private static void initConfig() {
Config.SkyWalking.IS_PREMAIN_MODE = true;
Config.SkyWalking.AGENT_BASE_PATH = initAgentBasePath();
Config.Agent.IS_PREMAIN_MODE = true;
Config.Agent.PATH = initAgentBasePath();
SnifferConfigInitializer.initialize();
}
......
package com.a.eye.skywalking.api.client;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.SegmentsMessage;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.HttpClients;
......@@ -21,11 +31,18 @@ public class CollectorClient implements Runnable {
private static ILog logger = LogManager.getLogger(CollectorClient.class);
private static long SLEEP_TIME_MILLIS = 500;
private CloseableHttpClient httpclient;
private String[] serverList;
private volatile int selectedServer = -1;
public CollectorClient() {
serverList = Config.Collector.SERVERS.split(",");
httpclient = HttpClients.custom()
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
.build();
Random r = new Random();
if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length);
}
}
@Override
......@@ -36,13 +53,19 @@ public class CollectorClient implements Runnable {
TraceSegmentProcessQueue segmentProcessQueue = ServiceManager.INSTANCE.findService(TraceSegmentProcessQueue.class);
List<TraceSegment> cachedTraceSegments = segmentProcessQueue.getCachedTraceSegments();
if (cachedTraceSegments.size() > 0) {
SegmentsMessage message = null;
int count = 0;
for (TraceSegment segment : cachedTraceSegments) {
/**
* No receiver found, means collector server is off-line.
*/
sleepTime = SLEEP_TIME_MILLIS * 10;
break;
if (message == null) {
message = new SegmentsMessage();
}
message.append(segment);
if (count == Config.Collector.BATCH_SIZE) {
sendToCollector(message);
message = null;
}
}
sendToCollector(message);
} else {
sleepTime = SLEEP_TIME_MILLIS;
}
......@@ -56,6 +79,64 @@ public class CollectorClient implements Runnable {
}
}
/**
* Send the given {@link SegmentsMessage} to collector.
*
* @param message to be send.
*/
private void sendToCollector(SegmentsMessage message) throws RESTResponseStatusError, IOException {
if (message == null) {
return;
}
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
String messageJson = gson.toJson(message);
try {
HttpPost httpPost = ready2Send(messageJson);
if (httpPost != null) {
CloseableHttpResponse httpResponse = httpclient.execute(httpPost);
int statusCode = httpResponse.getStatusLine().getStatusCode();
if (200 != statusCode) {
findBackupServer();
throw new RESTResponseStatusError(statusCode);
}
}
} catch (IOException e) {
findBackupServer();
throw e;
}
}
/**
* Prepare the given message for HTTP Post service.
*
* @param messageJson to send
* @return {@link HttpPost}, when is ready to send. otherwise, null.
*/
private HttpPost ready2Send(String messageJson) {
if (selectedServer == -1) {
//no available server
return null;
}
HttpPost post = new HttpPost("http://" + serverList[selectedServer] + Config.Collector.SERVICE_NAME);
StringEntity entity = new StringEntity(messageJson, ContentType.APPLICATION_JSON);
post.setEntity(entity);
return post;
}
/**
* Choose the next server in {@link #serverList}, by moving {@link #selectedServer}.
*/
private void findBackupServer() {
selectedServer++;
if (selectedServer == serverList.length) {
selectedServer = 0;
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
......
package com.a.eye.skywalking.api.client;
/**
* The <code>RESTResponseStatusError</code> represents the REST-Service client got an unexpected response code.
* Most likely, the response code is not 200.
*
* @author wusheng
*/
class RESTResponseStatusError extends Exception {
RESTResponseStatusError(int responseCode){
super("Unexpected service response code: " + responseCode);
}
}
......@@ -2,20 +2,24 @@ package com.a.eye.skywalking.api.conf;
public class Config {
public static class SkyWalking {
public static class Agent {
public static String APPLICATION_CODE = "";
public static boolean IS_PREMAIN_MODE = false;
public static String AGENT_BASE_PATH = "";
public static String PATH = "";
}
public static class Collector{
public static String SERVERS = "";
public static String SERVICE_NAME = "/segments";
public static int BATCH_SIZE = 50;
}
public static class Disruptor{
public static int BUFFER_SIZE = 512;
public static class Buffer {
public static int SIZE = 512;
}
......
......@@ -15,7 +15,7 @@ public class SnifferConfigInitializer {
public static void initialize() {
InputStream configFileStream;
if (Config.SkyWalking.IS_PREMAIN_MODE) {
if (Config.Agent.IS_PREMAIN_MODE) {
configFileStream = fetchAuthFileInputStream();
} else {
configFileStream = SnifferConfigInitializer.class.getResourceAsStream("/sky-walking.config");
......@@ -35,24 +35,24 @@ public class SnifferConfigInitializer {
String applicationCode = System.getProperty("applicationCode");
if (!StringUtil.isEmpty(applicationCode)) {
Config.SkyWalking.APPLICATION_CODE = applicationCode;
Config.Agent.APPLICATION_CODE = applicationCode;
}
String servers = System.getProperty("servers");
if(!StringUtil.isEmpty(servers)) {
Config.SkyWalking.SERVERS = servers;
Config.Collector.SERVERS = servers;
}
if (StringUtil.isEmpty(Config.SkyWalking.APPLICATION_CODE)) {
if (StringUtil.isEmpty(Config.Agent.APPLICATION_CODE)) {
throw new ExceptionInInitializerError("'-DapplicationCode=' is missing.");
}
if (StringUtil.isEmpty(Config.SkyWalking.SERVERS)) {
if (StringUtil.isEmpty(Config.Collector.SERVERS)) {
throw new ExceptionInInitializerError("'-Dservers=' is missing.");
}
}
private static InputStream fetchAuthFileInputStream() {
try {
return new FileInputStream(Config.SkyWalking.AGENT_BASE_PATH + File.separator + "sky-walking.config");
return new FileInputStream(Config.Agent.PATH + File.separator + "sky-walking.config");
} catch (Exception e) {
logger.warn("sky-walking.config is missing, use default config.");
return null;
......
......@@ -34,7 +34,7 @@ public final class TracerContext {
* Create a {@link TraceSegment} and init {@link #spanIdGenerator} as 0;
*/
TracerContext() {
this.segment = new TraceSegment(Config.SkyWalking.APPLICATION_CODE);
this.segment = new TraceSegment(Config.Agent.APPLICATION_CODE);
this.spanIdGenerator = 0;
}
......@@ -123,7 +123,7 @@ public final class TracerContext {
public void inject(ContextCarrier carrier) {
carrier.setTraceSegmentId(this.segment.getTraceSegmentId());
carrier.setSpanId(this.activeSpan().getSpanId());
carrier.setApplicationCode(Config.SkyWalking.APPLICATION_CODE);
carrier.setApplicationCode(Config.Agent.APPLICATION_CODE);
carrier.setPeerHost(Tags.PEER_HOST.get(activeSpan()));
carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
}
......
......@@ -19,7 +19,7 @@ public class SyncFileWriter implements IWriter {
private SyncFileWriter() {
try {
File logFilePath = new File(Config.SkyWalking.AGENT_BASE_PATH, Config.Logging.LOG_DIR_NAME);
File logFilePath = new File(Config.Agent.PATH, Config.Logging.LOG_DIR_NAME);
if (!logFilePath.exists()) {
logFilePath.mkdirs();
}
......
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.api.conf.Config;
public class WriterFactory {
public static IWriter getLogWriter(){
if (Config.SkyWalking.IS_PREMAIN_MODE){
if (Config.Agent.IS_PREMAIN_MODE){
return SyncFileWriter.instance();
}else{
return new STDOutWriter();
......
......@@ -30,8 +30,8 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
private volatile int cacheIndex;
public TraceSegmentProcessQueue() {
disruptor = new Disruptor<TraceSegmentHolder>(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
secondLevelCache = new TraceSegment[Config.Disruptor.BUFFER_SIZE];
disruptor = new Disruptor<TraceSegmentHolder>(TraceSegmentHolder.Factory.INSTANCE, Config.Buffer.SIZE, DaemonThreadFactory.INSTANCE);
secondLevelCache = new TraceSegment[Config.Buffer.SIZE];
cacheIndex = 0;
disruptor.handleEventsWith(this);
buffer = disruptor.getRingBuffer();
......
......@@ -11,13 +11,13 @@ public class SnifferConfigInitializerTest {
@Test
public void testInitialize(){
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
SnifferConfigInitializer.initialize();
Assert.assertEquals("crmApp", Config.SkyWalking.APPLICATION_CODE);
Assert.assertEquals("127.0.0.1:8080", Config.SkyWalking.SERVERS);
Assert.assertEquals("crmApp", Config.Agent.APPLICATION_CODE);
Assert.assertEquals("127.0.0.1:8080", Config.Collector.SERVERS);
Assert.assertNotNull(Config.Disruptor.BUFFER_SIZE);
Assert.assertNotNull(Config.Buffer.SIZE);
Assert.assertNotNull(Config.Logging.LOG_DIR_NAME);
Assert.assertNotNull(Config.Logging.LOG_FILE_NAME);
Assert.assertNotNull(Config.Logging.MAX_LOG_FILE_LENGTH);
......@@ -26,12 +26,12 @@ public class SnifferConfigInitializerTest {
@Test(expected = ExceptionInInitializerError.class)
public void testErrorInitialize(){
Config.SkyWalking.IS_PREMAIN_MODE = true;
Config.Agent.IS_PREMAIN_MODE = true;
SnifferConfigInitializer.initialize();
}
@AfterClass
public static void reset(){
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
}
}
......@@ -26,7 +26,7 @@ public class EasyLoggerTest {
@Test
public void testLog(){
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
PrintStream output = Mockito.mock(PrintStream.class);
System.setOut(output);
......
......@@ -25,18 +25,18 @@ public class WriterFactoryTest {
*/
@Test
public void testGetLogWriter(){
Config.SkyWalking.IS_PREMAIN_MODE = true;
Config.Agent.IS_PREMAIN_MODE = true;
PrintStream mockStream = Mockito.mock(PrintStream.class);
System.setErr(mockStream);
Assert.assertEquals(SyncFileWriter.instance(), WriterFactory.getLogWriter());
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
Assert.assertTrue(WriterFactory.getLogWriter() instanceof STDOutWriter);
}
@AfterClass
public static void reset(){
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
System.setErr(errRef);
}
}
skywalking.application_code = crmApp
skywalking.servers = 127.0.0.1:8080
agent.application_code = crmApp
collector.servers = 127.0.0.1:8080
......@@ -80,7 +80,7 @@ public class DubboInterceptorTest {
Mockito.when(RpcContext.getContext()).thenReturn(rpcContext);
when(rpcContext.isConsumerSide()).thenReturn(true);
when(methodInvokeContext.allArguments()).thenReturn(new Object[]{invoker, invocation});
Config.SkyWalking.APPLICATION_CODE = "DubboTestCases-APP";
Config.Agent.APPLICATION_CODE = "DubboTestCases-APP";
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册