提交 e0d50533 编写于 作者: wu-sheng's avatar wu-sheng

New context carrier.

上级 cd1d3a3b
......@@ -134,13 +134,5 @@ public class Config {
*/
public static boolean TRACE_PARAM = false;
}
public static class Propagation {
/**
* The header name of cross process propagation data.
*/
public static String HEADER_NAME = "sw3";
}
}
}
package org.skywalking.apm.agent.core.context;
import java.util.Iterator;
/**
* @author wusheng
*/
public class CarrierItem implements Iterator<CarrierItem> {
private String headKey;
private String headValue;
private CarrierItem next;
public CarrierItem(String headKey, String headValue) {
this.headKey = headKey;
this.headValue = headValue;
next = null;
}
public CarrierItem(String headKey, String headValue, CarrierItem next) {
this.headKey = headKey;
this.headValue = headValue;
this.next = next;
}
public String getHeadKey() {
return headKey;
}
public String getHeadValue() {
return headValue;
}
public void setHeadValue(String headValue) {
this.headValue = headValue;
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public CarrierItem next() {
return next;
}
@Override
public void remove() {
}
}
......@@ -59,13 +59,18 @@ public class ContextCarrier implements Serializable {
*/
private DistributedTraceId primaryDistributedTraceId;
public CarrierItem items() {
SW3CarrierItem carrierItem = new SW3CarrierItem(this, null);
return carrierItem;
}
/**
* Serialize this {@link ContextCarrier} to a {@link String},
* with '|' split.
*
* @return the serialization string.
*/
public String serialize() {
String serialize() {
if (this.isValid()) {
return StringUtil.join('|',
this.getTraceSegmentId().encode(),
......@@ -75,7 +80,7 @@ public class ContextCarrier implements Serializable {
this.getPeerHost(),
this.getEntryOperationName(),
this.getParentOperationName(),
this.getPrimaryDistributedTraceId().toBase64());
this.getPrimaryDistributedTraceId().encode());
} else {
return "";
}
......@@ -86,7 +91,7 @@ public class ContextCarrier implements Serializable {
*
* @param text carries {@link #traceSegmentId} and {@link #spanId}, with '|' split.
*/
public ContextCarrier deserialize(String text) {
ContextCarrier deserialize(String text) {
if (text != null) {
String[] parts = text.split("\\|", 8);
if (parts.length == 8) {
......
package org.skywalking.apm.agent.core.context;
/**
* @author wusheng
*/
public class SW3CarrierItem extends CarrierItem {
private static final String HEAD_NAME = "sw3";
private ContextCarrier carrier;
public SW3CarrierItem(ContextCarrier carrier, CarrierItem next) {
super(HEAD_NAME, carrier.serialize(), next);
this.carrier = carrier;
}
@Override
public void setHeadValue(String headValue) {
carrier.deserialize(headValue);
}
}
package org.skywalking.apm.agent.core.context;
/**
* @author wusheng
*/
public class TraceContextCarrierItem extends CarrierItem {
private static final String HEAD_NAME = "Trace-Context";
public TraceContextCarrierItem(String headValue, CarrierItem next) {
super(HEAD_NAME, headValue, next);
}
}
......@@ -26,7 +26,7 @@ public abstract class DistributedTraceId {
this.id = new ID(id);
}
public String toBase64() {
public String encode() {
return id.encode();
}
......
package org.skywalking.apm.agent.core.context.ids;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.skywalking.apm.agent.core.context.ids.base64.Base64;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author wusheng
*/
public class ID {
private static final Base64.Encoder ENCODER = Base64.getEncoder();
private static final Base64.Decoder DECODER = Base64.getDecoder();
private long part1;
private long part2;
private long part3;
......@@ -25,31 +19,15 @@ public class ID {
}
public ID(String encodingString) {
String[] idParts = encodingString.split(".", 3);
int index = 0;
for (int part = 0; part < 3; part++) {
String encodedString;
char potentialTypeChar = encodingString.charAt(index);
long value;
if (potentialTypeChar == '#') {
encodedString = encodingString.substring(index + 1, index + 5);
index += 5;
value = ByteBuffer.wrap(DECODER.decode(encodedString)).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(0);
} else if (potentialTypeChar == '$') {
encodedString = encodingString.substring(index + 1, index + 9);
index += 9;
value = ByteBuffer.wrap(DECODER.decode(encodedString)).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().get(0);
} else {
encodedString = encodingString.substring(index, index + 12);
index += 12;
value = ByteBuffer.wrap(DECODER.decode(encodedString)).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().get(0);
}
if (part == 0) {
part1 = value;
part1 = Long.parseLong(idParts[part]);
} else if (part == 1) {
part2 = value;
part2 = Long.parseLong(idParts[part]);
} else {
part3 = value;
part3 = Long.parseLong(idParts[part]);
}
}
......@@ -57,36 +35,11 @@ public class ID {
public String encode() {
if (encoding == null) {
encoding = long2Base64(part1) + long2Base64(part2) + long2Base64(part3);
encoding = toString();
}
return encoding;
}
private String long2Base64(long partN) {
if (partN < 0) {
throw new IllegalArgumentException("negative value.");
}
if (partN < 32768) {
// 0 - 32767
// "#" as a prefix of a short value with base64 encoding.
byte[] data = new byte[2];
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put((short)partN);
return '#' + ENCODER.encodeToString(data);
} else if (partN <= 2147483647) {
// 32768 - 2147483647
// "$" as a prefix of an integer value (greater than a short) with base64 encoding.
byte[] data = new byte[4];
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put((int)partN);
return '$' + ENCODER.encodeToString(data);
} else {
// > 2147483647
// a long value (greater than an integer)
byte[] data = new byte[8];
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(partN);
return ENCODER.encodeToString(data);
}
}
@Override public String toString() {
return part1 + "." + part2 + '.' + part3;
}
......
......@@ -6,7 +6,7 @@ import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import java.lang.reflect.Method;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.context.CarrierItem;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.tag.Tags;
......@@ -16,28 +16,20 @@ import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.skywalking.apm.plugin.dubbox.BugFixActive;
import org.skywalking.apm.plugin.dubbox.SWBaseBean;
/**
* {@link DubboInterceptor} define how to enhance class {@link com.alibaba.dubbo.monitor.support.MonitorFilter#invoke(Invoker,
* Invocation)}. the trace context transport to the provider side by {@link RpcContext#attachments}.but all the version
* of dubbo framework below 2.8.3 don't support {@link RpcContext#attachments}, we support another way to support it. it
* is that all request parameters of dubbo service need to extend {@link SWBaseBean}, and {@link DubboInterceptor} will
* inject the trace context data to the {@link SWBaseBean} bean and extract the trace context data from {@link
* SWBaseBean}, or the trace context data will not transport to the provider side.
* of dubbo framework below 2.8.3 don't support {@link RpcContext#attachments}, we support another way to support it.
*
* @author zhangxin
*/
public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
/**
* <h2>Consumer:</h2> The serialized trace context data will inject the first param that extend {@link SWBaseBean}
* of dubbo service if the method {@link BugFixActive#active()} be called. or the serialized context data will
* <h2>Consumer:</h2> The serialized trace context data will
* inject to the {@link RpcContext#attachments} for transport to provider side.
* <p>
* <h2>Provider:</h2> The serialized trace context data will extract from the first param that extend {@link
* SWBaseBean} of dubbo service if the method {@link BugFixActive#active()} be called. or it will extract from
* <h2>Provider:</h2> The serialized trace context data will extract from
* {@link RpcContext#attachments}. current trace segment will ref if the serialize context data is not null.
*/
@Override
......@@ -56,20 +48,19 @@ public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
if (isConsumer) {
final ContextCarrier contextCarrier = new ContextCarrier();
span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
if (!BugFixActive.isActive()) {
//invocation.getAttachments().put("contextData", contextDataStr);
//@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
rpcContext.getAttachments().put(Config.Plugin.Propagation.HEADER_NAME, contextCarrier.serialize());
} else {
fix283SendNoAttachmentIssue(invocation, contextCarrier);
//invocation.getAttachments().put("contextData", contextDataStr);
//@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
CarrierItem items = contextCarrier.items();
while (items.hasNext()) {
CarrierItem next = items.next();
rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
}
} else {
ContextCarrier contextCarrier;
if (!BugFixActive.isActive()) {
contextCarrier = new ContextCarrier().deserialize(rpcContext.getAttachment(Config.Plugin.Propagation.HEADER_NAME));
} else {
contextCarrier = fix283RecvNoAttachmentIssue(invocation);
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem items = contextCarrier.items();
while (items.hasNext()) {
CarrierItem next = items.next();
next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
}
span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
......@@ -143,33 +134,4 @@ public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
requestURL.append(generateOperationName(url, invocation));
return requestURL.toString();
}
/**
* Set the trace context.
*
* @param contextCarrier {@link ContextCarrier}.
*/
private void fix283SendNoAttachmentIssue(Invocation invocation, ContextCarrier contextCarrier) {
for (Object parameter : invocation.getArguments()) {
if (parameter instanceof SWBaseBean) {
((SWBaseBean)parameter).setTraceContext(contextCarrier.serialize());
return;
}
}
}
/**
* Fetch the trace context by using {@link Invocation#getArguments()}.
*
* @return trace context data.
*/
private ContextCarrier fix283RecvNoAttachmentIssue(Invocation invocation) {
for (Object parameter : invocation.getArguments()) {
if (parameter instanceof SWBaseBean) {
return new ContextCarrier().deserialize(((SWBaseBean)parameter).getTraceContext());
}
}
return null;
}
}
package org.skywalking.apm.plugin.dubbox;
/**
* {@link BugFixActive#active} is an flag that present the dubbox version is below 2.8.3, The version 2.8.3 of dubbox
* don't support attachment. so skywalking provided another way to support the function that transport the serialized
* context data. The way is that all parameters of dubbo service need to extend {@link SWBaseBean}, {@link
* org.skywalking.apm.plugin.dubbo.DubboInterceptor} fetch the serialized context data by using {@link
* SWBaseBean#getTraceContext()}.
*
* @author zhangxin
*/
public final class BugFixActive {
private static boolean ACTIVE = false;
/**
* Set active status, before startup dubbo remote.
*/
public static void active() {
BugFixActive.ACTIVE = true;
}
public static boolean isActive() {
return BugFixActive.ACTIVE;
}
}
package org.skywalking.apm.plugin.dubbox;
import java.io.Serializable;
/**
* All the request parameter of dubbox service need to extend {@link SWBaseBean} to transport
* the serialized trace context to the provider side if the version of dubbox is below 2.8.3.
*
* @author zhangxin
*/
public class SWBaseBean implements Serializable {
/**
* Serialized trace context.
*/
private String traceContext;
public String getTraceContext() {
return traceContext;
}
public void setTraceContext(String traceContext) {
this.traceContext = traceContext;
}
}
......@@ -7,7 +7,6 @@ import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import java.util.List;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
......
......@@ -6,7 +6,7 @@ import java.net.URL;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.context.CarrierItem;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.tag.Tags;
......@@ -42,7 +42,11 @@ public class HttpClientExecuteInterceptor implements InstanceMethodsAroundInterc
Tags.HTTP.METHOD.set(span, httpRequest.getRequestLine().getMethod());
SpanLayer.asHttp(span);
httpRequest.setHeader(Config.Plugin.Propagation.HEADER_NAME, contextCarrier.serialize());
CarrierItem items = contextCarrier.items();
while (items.hasNext()) {
CarrierItem next = items.next();
httpRequest.setHeader(next.getHeadKey(), next.getHeadValue());
}
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
......
......@@ -4,7 +4,7 @@ import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.rpc.Response;
import com.weibo.api.motan.rpc.URL;
import java.lang.reflect.Method;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.context.CarrierItem;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.tag.Tags;
......@@ -42,7 +42,11 @@ public class MotanConsumerInterceptor implements InstanceConstructorInterceptor,
span.setComponent(ComponentsDefine.MOTAN);
Tags.URL.set(span, url.getIdentity());
SpanLayer.asRPCFramework(span);
request.setAttachment(Config.Plugin.Propagation.HEADER_NAME, contextCarrier.serialize());
CarrierItem items = contextCarrier.items();
while(items.hasNext()){
CarrierItem next = items.next();
request.setAttachment(next.getHeadKey(), next.getHeadValue());
}
}
}
......
......@@ -3,7 +3,7 @@ package org.skywalking.apm.plugin.motan;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.rpc.Response;
import java.lang.reflect.Method;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.context.CarrierItem;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
......@@ -27,8 +27,13 @@ public class MotanProviderInterceptor implements InstanceMethodsAroundIntercepto
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Request request = (Request)allArguments[0];
String serializedContextData = request.getAttachments().get(Config.Plugin.Propagation.HEADER_NAME);
ContextCarrier contextCarrier = new ContextCarrier().deserialize(serializedContextData);
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem items = contextCarrier.items();
while(items.hasNext()){
CarrierItem next = items.next();
next.setHeadValue(request.getAttachments().get(next.getHeadKey()));
}
AbstractSpan span = ContextManager.createEntrySpan(generateViewPoint(request), contextCarrier);
SpanLayer.asRPCFramework(span);
span.setComponent(ComponentsDefine.MOTAN);
......
......@@ -3,7 +3,7 @@ package org.skywalking.apm.plugin.tomcat78x;
import java.lang.reflect.Method;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.context.CarrierItem;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.tag.Tags;
......@@ -16,9 +16,9 @@ import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptR
import org.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* {@link TomcatInvokeInterceptor} fetch the serialized context data by using {@link HttpServletRequest#getHeader(String)}.
* The {@link TraceSegment#refs} of current trace segment will reference to the trace
* segment id of the previous level if the serialized context is not null.
* {@link TomcatInvokeInterceptor} fetch the serialized context data by using {@link
* HttpServletRequest#getHeader(String)}. The {@link TraceSegment#refs} of current trace segment will reference to the
* trace segment id of the previous level if the serialized context is not null.
*/
public class TomcatInvokeInterceptor implements InstanceMethodsAroundInterceptor {
......@@ -36,8 +36,14 @@ public class TomcatInvokeInterceptor implements InstanceMethodsAroundInterceptor
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
HttpServletRequest request = (HttpServletRequest)allArguments[0];
String tracingHeaderValue = request.getHeader(Config.Plugin.Propagation.HEADER_NAME);
ContextCarrier contextCarrier = new ContextCarrier().deserialize(tracingHeaderValue);
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem items = contextCarrier.items();
while (items.hasNext()) {
CarrierItem next = items.next();
next.setHeadValue(request.getHeader(next.getHeadKey()));
}
AbstractSpan span = ContextManager.createEntrySpan(request.getRequestURI(), contextCarrier);
Tags.URL.set(span, request.getRequestURL().toString());
Tags.HTTP.METHOD.set(span, request.getMethod());
......
......@@ -4,7 +4,6 @@ import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
......@@ -18,7 +17,6 @@ import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.test.helper.SegmentHelper;
import org.skywalking.apm.agent.test.helper.SegmentRefHelper;
import org.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.skywalking.apm.agent.test.tools.SegmentStorage;
import org.skywalking.apm.agent.test.tools.SegmentStoragePoint;
......@@ -34,7 +32,6 @@ import org.skywalking.apm.toolkit.activation.opentracing.tracer.SkywalkingTracer
import org.skywalking.apm.toolkit.opentracing.SkywalkingContinuation;
import org.skywalking.apm.toolkit.opentracing.SkywalkingSpan;
import org.skywalking.apm.toolkit.opentracing.SkywalkingSpanBuilder;
import org.skywalking.apm.toolkit.opentracing.SkywalkingTracer;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册