提交 a0f28fa9 编写于 作者: X Xin,Zhang 提交者: wu-sheng

Change the intercept point of block call way (#1190)

* Change the buired point of block call way

* Support future way and change the operation name of span
上级 002b6517
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.grpc.v1;
import io.grpc.Channel;
import io.grpc.MethodDescriptor;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.formatOperationName;
/**
* @author zhang xin
*/
public class AsyncUnaryRequestCallCallInterceptor implements StaticMethodsAroundInterceptor {
@Override public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
MethodInterceptResult result) {
BlockingCallClientInterceptor originClientCall = (BlockingCallClientInterceptor)allArguments[0];
Channel channel = originClientCall.getChannel();
MethodDescriptor methodDescriptor = originClientCall.getMethodDescriptor();
final AbstractSpan span = ContextManager.createExitSpan(formatOperationName(methodDescriptor), channel.authority());
span.setComponent(ComponentsDefine.GRPC);
SpanLayer.asRPCFramework(span);
}
@Override public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Object ret) {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
......@@ -21,38 +21,27 @@ package org.apache.skywalking.apm.plugin.grpc.v1;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.formatOperationName;
/**
* @author zhang xin
*/
public class BlockingCallClientInterceptor extends ForwardingClientCall.SimpleForwardingClientCall {
private final String serviceName;
private final String remotePeer;
private final MethodDescriptor methodDescriptor;
private final Channel channel;
public BlockingCallClientInterceptor(ClientCall delegate, MethodDescriptor method, Channel channel) {
super(delegate);
this.serviceName = formatOperationName(method);
this.remotePeer = channel.authority();
this.methodDescriptor = method;
this.channel = channel;
}
@Override public void start(Listener responseListener, Metadata headers) {
final AbstractSpan span = ContextManager.createExitSpan(serviceName, remotePeer);
span.setComponent(ComponentsDefine.GRPC);
SpanLayer.asRPCFramework(span);
final ContextCarrier contextCarrier = new ContextCarrier();
ContextManager.inject(contextCarrier);
CarrierItem contextItem = contextCarrier.items();
......@@ -61,35 +50,14 @@ public class BlockingCallClientInterceptor extends ForwardingClientCall.SimpleFo
Metadata.Key<String> headerKey = Metadata.Key.of(contextItem.getHeadKey(), Metadata.ASCII_STRING_MARSHALLER);
headers.put(headerKey, contextItem.getHeadValue());
}
delegate().start(new CallListener(responseListener), headers);
delegate().start(responseListener, headers);
}
private class CallListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener {
protected CallListener(Listener delegate) {
super(delegate);
}
@Override public void onReady() {
delegate().onReady();
}
@Override public void onClose(Status status, Metadata trailers) {
delegate().onClose(status, trailers);
if (!status.isOk()) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred().log(status.getCause());
Tags.STATUS_CODE.set(activeSpan, status.getCode().name());
}
ContextManager.stopSpan();
}
@Override
public void onMessage(Object message) {
delegate().onMessage(message);
}
public MethodDescriptor getMethodDescriptor() {
return methodDescriptor;
}
@Override public void onHeaders(Metadata headers) {
delegate().onHeaders(headers);
}
public Channel getChannel() {
return channel;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.grpc.v1;
import io.grpc.Channel;
import io.grpc.MethodDescriptor;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.formatOperationName;
/**
* @author zhang xin
*/
public class BlockingCallInterceptor implements StaticMethodsAroundInterceptor {
@Override public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
MethodInterceptResult result) {
Channel channel = (Channel)allArguments[0];
MethodDescriptor methodDescriptor = (MethodDescriptor)allArguments[1];
final AbstractSpan span = ContextManager.createExitSpan(formatOperationName(methodDescriptor), channel.authority());
span.setComponent(ComponentsDefine.GRPC);
SpanLayer.asRPCFramework(span);
}
@Override public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Object ret) {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
......@@ -30,11 +30,15 @@ import java.util.Map;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.util.StringUtil;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_ON_NEXT_OPERATION_NAME;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.SERVER;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_REQUEST_OBSERVER_ON_COMPLETE_OPERATION_NAME;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_REQUEST_OBSERVER_ON_ERROR_OPERATION_NAME;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_REQUEST_OBSERVER_ON_NEXT_OPERATION_NAME;
/**
* @author zhang xin
......@@ -62,19 +66,30 @@ public class CallServerInterceptor implements ServerInterceptor {
final AbstractSpan span = ContextManager.createEntrySpan(OperationNameFormatUtil.formatOperationName(call.getMethodDescriptor()), contextCarrier);
span.setComponent(ComponentsDefine.GRPC);
return new ServerCallListener(handler.startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
delegate().sendHeaders(responseHeaders);
}
}, headers), call.getMethodDescriptor());
try {
return new ServerCallListener(handler.startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
delegate().sendHeaders(responseHeaders);
}
}, headers), call.getMethodDescriptor(), ContextManager.capture());
} finally {
ContextManager.stopSpan();
}
}
public class ServerCallListener extends ForwardingServerCallListener.SimpleForwardingServerCallListener {
protected ServerCallListener(ServerCall.Listener delegate, MethodDescriptor descriptor) {
private final ContextSnapshot contextSnapshot;
private final MethodDescriptor.MethodType methodType;
private final String operationPrefix;
protected ServerCallListener(ServerCall.Listener delegate, MethodDescriptor descriptor,
ContextSnapshot contextSnapshot) {
super(delegate);
this.contextSnapshot = contextSnapshot;
this.methodType = descriptor.getType();
this.operationPrefix = OperationNameFormatUtil.formatOperationName(descriptor) + SERVER;
}
@Override public void onReady() {
......@@ -83,7 +98,8 @@ public class CallServerInterceptor implements ServerInterceptor {
@Override public void onMessage(Object message) {
try {
ContextManager.createLocalSpan(STREAM_ON_NEXT_OPERATION_NAME);
ContextManager.createLocalSpan(operationPrefix + STREAM_REQUEST_OBSERVER_ON_NEXT_OPERATION_NAME);
ContextManager.continued(contextSnapshot);
delegate().onMessage(message);
} catch (Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
......@@ -93,13 +109,35 @@ public class CallServerInterceptor implements ServerInterceptor {
}
@Override public void onComplete() {
delegate().onComplete();
ContextManager.stopSpan();
if (methodType != MethodDescriptor.MethodType.UNARY) {
try {
ContextManager.createLocalSpan(operationPrefix + STREAM_REQUEST_OBSERVER_ON_COMPLETE_OPERATION_NAME);
ContextManager.continued(contextSnapshot);
delegate().onComplete();
} catch (Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
} finally {
ContextManager.stopSpan();
}
} else {
delegate().onComplete();
}
}
@Override public void onCancel() {
delegate().onCancel();
ContextManager.stopSpan();
if (methodType != MethodDescriptor.MethodType.UNARY) {
try {
ContextManager.createLocalSpan(operationPrefix + STREAM_REQUEST_OBSERVER_ON_ERROR_OPERATION_NAME);
ContextManager.continued(contextSnapshot);
delegate().onCancel();
} catch (Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
} finally {
ContextManager.stopSpan();
}
} else {
delegate().onCancel();
}
}
@Override public void onHalfClose() {
......
......@@ -24,7 +24,20 @@ package org.apache.skywalking.apm.plugin.grpc.v1;
* @author zhang xin
*/
public class Constants {
public static final String STREAM_ON_READY_OPERATION_NAME = "RequestStreamObserver/onReady";
public static final String CLIENT = "/client";
public static final String SERVER = "/server";
public static final String STREAM_REQUEST_OBSERVER_ON_NEXT_OPERATION_NAME = "/RequestObserver/onNext";
public static final String STREAM_REQUEST_OBSERVER_ON_ERROR_OPERATION_NAME = "/RequestObserver/onError";
public static final String STREAM_REQUEST_OBSERVER_ON_COMPLETE_OPERATION_NAME = "/RequestObserver/onComplete";
public static final String STREAM_RESPONSE_OBSERVER_ON_NEXT_OPERATION_NAME = "/ResponseObserver/onNext";
public static final String STREAM_RESPONSE_OBSERVER_ON_ERROR_OPERATION_NAME = "/ResponseObserver/onError";
public static final String STREAM_RESPONSE_OBSERVER_ON_COMPLETE_OPERATION_NAME = "/ResponseObserver/onComplete";
public static final String STREAM_ON_NEXT_OPERATION_NAME = "ResponseStreamObserver/OnNext";
}
......@@ -34,8 +34,10 @@ import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_ON_NEXT_OPERATION_NAME;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_ON_READY_OPERATION_NAME;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.CLIENT;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_RESPONSE_OBSERVER_ON_COMPLETE_OPERATION_NAME;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_RESPONSE_OBSERVER_ON_ERROR_OPERATION_NAME;
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.STREAM_RESPONSE_OBSERVER_ON_NEXT_OPERATION_NAME;
import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.formatOperationName;
/**
......@@ -45,11 +47,13 @@ public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForw
private final String serviceName;
private final String remotePeer;
private final String operationPrefix;
protected StreamCallClientInterceptor(ClientCall delegate, MethodDescriptor method, Channel channel) {
super(delegate);
this.serviceName = formatOperationName(method);
this.remotePeer = channel.authority();
this.operationPrefix = OperationNameFormatUtil.formatOperationName(method) + CLIENT;
}
@Override
......@@ -78,10 +82,6 @@ public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForw
}
@Override public void onReady() {
final AbstractSpan span = ContextManager.createLocalSpan(STREAM_ON_READY_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
SpanLayer.asRPCFramework(span);
ContextManager.continued(contextSnapshot);
delegate().onReady();
}
......@@ -91,7 +91,8 @@ public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForw
@Override public void onMessage(Object message) {
try {
ContextManager.createLocalSpan(STREAM_ON_NEXT_OPERATION_NAME);
ContextManager.createLocalSpan(operationPrefix + STREAM_RESPONSE_OBSERVER_ON_NEXT_OPERATION_NAME);
ContextManager.continued(contextSnapshot);
delegate().onMessage(message);
} catch (Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
......@@ -101,14 +102,21 @@ public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForw
}
@Override public void onClose(Status status, Metadata trailers) {
delegate().onClose(status, trailers);
if (!status.isOk()) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred().log(status.getCause());
Tags.STATUS_CODE.set(activeSpan, status.getCode().name());
try {
if (!status.isOk()) {
AbstractSpan abstractSpan = ContextManager.createLocalSpan(operationPrefix + STREAM_RESPONSE_OBSERVER_ON_ERROR_OPERATION_NAME);
abstractSpan.errorOccurred().log(status.getCause());
Tags.STATUS_CODE.set(abstractSpan, status.getCode().name());
} else {
AbstractSpan abstractSpan = ContextManager.createLocalSpan(operationPrefix + STREAM_RESPONSE_OBSERVER_ON_COMPLETE_OPERATION_NAME);
}
delegate().onClose(status, trailers);
ContextManager.continued(contextSnapshot);
} catch (Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
} finally {
ContextManager.stopSpan();
}
ContextManager.stopSpan();
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.grpc.v1.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* @author zhang xin
*/
public class ClientCallsInstrumentation extends ClassStaticMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "io.grpc.stub.ClientCalls";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.grpc.v1.BlockingCallInterceptor";
private static final String FUTURE_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.grpc.v1.AsyncUnaryRequestCallCallInterceptor";
@Override protected StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[] {
new StaticMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("blockingUnaryCall").and(takesArgumentWithType(1, "io.grpc.MethodDescriptor"));
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
},
// new StaticMethodsInterceptPoint() {
// @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
// return named("blockingServerStreamingCall").and(takesArgumentWithType(1, "io.grpc.MethodDescriptor"));
// }
//
// @Override public String getMethodsInterceptor() {
// return INTERCEPTOR_CLASS;
// }
//
// @Override public boolean isOverrideArgs() {
// return false;
// }
// },
new StaticMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("asyncUnaryRequestCall").and(takesArgumentWithType(2, "io.grpc.ClientCall$Listener"));
}
@Override public String getMethodsInterceptor() {
return FUTURE_INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return true;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
......@@ -15,4 +15,5 @@
# limitations under the License.
grpc-1.x=org.apache.skywalking.apm.plugin.grpc.v1.define.AbstractStubInstrumentation
grpc-1.x=org.apache.skywalking.apm.plugin.grpc.v1.define.AbstractServerImplBuilderInstrumentation
\ No newline at end of file
grpc-1.x=org.apache.skywalking.apm.plugin.grpc.v1.define.AbstractServerImplBuilderInstrumentation
grpc-1.x=org.apache.skywalking.apm.plugin.grpc.v1.define.ClientCallsInstrumentation
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册