提交 ff2a161e 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #209 from wu-sheng/feature/208-agentside

Provide new data protocol for segment messages.
package org.skywalking.apm.trace;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -18,7 +11,6 @@ import java.util.List;
*
* @author wusheng
*/
@JsonAdapter(SegmentsMessage.Serializer.class)
public class SegmentsMessage {
private List<TraceSegment> segments;
......@@ -34,40 +26,23 @@ public class SegmentsMessage {
return Collections.unmodifiableList(segments);
}
public static class Serializer extends TypeAdapter<SegmentsMessage> {
@Override
public void write(JsonWriter out, SegmentsMessage value) throws IOException {
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
out.beginArray();
try {
for (TraceSegment segment : value.segments) {
out.jsonValue(gson.toJson(segment));
}
} finally {
out.endArray();
}
}
@Override
public SegmentsMessage read(JsonReader in) throws IOException {
SegmentsMessage message = new SegmentsMessage();
in.beginArray();
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
try {
while (in.hasNext()) {
TraceSegment traceSegment = gson.fromJson(in, TraceSegment.class);
message.append(traceSegment);
}
} finally {
in.endArray();
}
return message;
/**
* This serialization mechanism started from 3.1, it is similar to network package.
* The data protocol is
*
* segment1.json.length + ' '(one blank space) + segment1.json
* + segment2.json.length + ' '(one blank space) + segment2.json
* + etc.
*
* @param gson the serializer for {@link TraceSegment}
* @return the string represents the <code>SegmentMessage</code>
*/
public String serialize(Gson gson) {
StringBuilder buffer = new StringBuilder();
for (TraceSegment segment : segments) {
String segmentJson = gson.toJson(segment);
buffer.append(segmentJson.length()).append(' ').append(segmentJson);
}
return buffer.toString();
}
}
......@@ -108,15 +108,18 @@ public class TraceSegmentTestCase {
SegmentsMessage message = new SegmentsMessage();
message.append(segment);
String json = gson.toJson(message);
message = gson.fromJson(json, SegmentsMessage.class);
String jsonString = message.serialize(gson);
int length = Integer.parseInt(jsonString.substring(0, 4));
TraceSegment newSegment = message.getSegments().get(0);
String segmentJson = jsonString.substring(5);
Assert.assertEquals(segment.getSpans().size(), newSegment.getSpans().size());
Assert.assertEquals(segment.getRefs().get(0).getTraceSegmentId(), newSegment.getRefs().get(0).getTraceSegmentId());
Assert.assertEquals(Tags.SPAN_LAYER.get(segment.getSpans().get(1)), Tags.SPAN_LAYER.get(newSegment.getSpans().get(1)));
Assert.assertEquals(segment.getSpans().get(1).getLogs().get(0).getTime(), newSegment.getSpans().get(1).getLogs().get(0).getTime());
Assert.assertEquals(length, segmentJson.length());
TraceSegment recoverySegment = gson.fromJson(segmentJson, TraceSegment.class);
Assert.assertEquals(segment.getSpans().size(), recoverySegment.getSpans().size());
Assert.assertEquals(segment.getRefs().get(0).getTraceSegmentId(), recoverySegment.getRefs().get(0).getTraceSegmentId());
Assert.assertEquals(Tags.SPAN_LAYER.get(segment.getSpans().get(1)), Tags.SPAN_LAYER.get(recoverySegment.getSpans().get(1)));
Assert.assertEquals(segment.getSpans().get(1).getLogs().get(0).getTime(), recoverySegment.getSpans().get(1).getLogs().get(0).getTime());
}
}
......@@ -32,6 +32,7 @@ public class CollectorClient implements Runnable {
private static long SLEEP_TIME_MILLIS = 500;
private String[] serverList;
private volatile int selectedServer = -1;
private Gson serializer;
public CollectorClient() {
serverList = Config.Collector.SERVERS.split(",");
......@@ -39,6 +40,9 @@ public class CollectorClient implements Runnable {
if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length);
}
serializer = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
}
@Override
......@@ -84,10 +88,7 @@ public class CollectorClient implements Runnable {
if (message == null) {
return;
}
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
String messageJson = gson.toJson(message);
String messageJson = message.serialize(serializer);
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
HttpPost httpPost = ready2Send(messageJson);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册