提交 da2189e5 编写于 作者: X XiaoFu 提交者: wu-sheng

[Agent] Add plugin for ActiveMQ 5.x (#1513)

* add activemq-plugin

* fix  update ActiveMQConsumerInstrumentation Enhance another class

* refacor ActiveMQConsumerInstrumentation code

* support both  topic and queue

* clear the code

* add UnitTest add fix bug

* fix obtain url  bug

* Refactor the ActiveMQProducerInstrumentation code

* fix bug

* fix ConstructorInterceptor BUG

* add Licensed

* Update application.yml

Restore configuration

* Restore configuration

* perfect test

* fix

* fix License

* fix  Project Files

* fix import class in instrumentation

* fix test case

* fix java doc

* fix  Trigger CI

* fix ActiveMQConsumerAndProducerConstructorInterceptorTest

* delete unit test

* fix

* fix

* fix  folder directory problem

* Delete ActiveMQConsumerInstrumentation.java
上级 3b1ffa93
...@@ -111,4 +111,4 @@ configuration: ...@@ -111,4 +111,4 @@ configuration:
# default: # default:
# host: localhost # host: localhost
# port: 9411 # port: 9411
# contextPath: / # contextPath: /
\ No newline at end of file
...@@ -156,7 +156,15 @@ mongodb-driver: ...@@ -156,7 +156,15 @@ mongodb-driver:
SOFARPC: SOFARPC:
id: 43 id: 43
languages: Java languages: Java
ActiveMQ:
id: 44
languages: Java
activemq-producer:
id: 45
languages: Java
activemq-consumer:
id: 46
languages: Java
# .NET/.NET Core components # .NET/.NET Core components
# [3000, 4000) for C#/.NET only # [3000, 4000) for C#/.NET only
AspNetCore: AspNetCore:
...@@ -223,6 +231,8 @@ Component-Server-Mappings: ...@@ -223,6 +231,8 @@ Component-Server-Mappings:
rocketMQ-consumer: RocketMQ rocketMQ-consumer: RocketMQ
kafka-producer: Kafka kafka-producer: Kafka
kafka-consumer: Kafka kafka-consumer: Kafka
activemq-producer: ActiveMQ
activemq-consumer: ActiveMQ
postgresql-jdbc-driver: PostgreSQL postgresql-jdbc-driver: PostgreSQL
Xmemcached: Memcached Xmemcached: Memcached
Spymemcached: Memcached Spymemcached: Memcached
......
...@@ -92,6 +92,10 @@ public class ComponentsDefine { ...@@ -92,6 +92,10 @@ public class ComponentsDefine {
public static final OfficialComponent SOFARPC = new OfficialComponent(43, "SOFARPC"); public static final OfficialComponent SOFARPC = new OfficialComponent(43, "SOFARPC");
public static final OfficialComponent ACTIVEMQ_PRODUCER = new OfficialComponent(45,"activemq-producer");
public static final OfficialComponent ACTIVEMQ_CONSUMER = new OfficialComponent(46,"activemq-consumer");
private static ComponentsDefine INSTANCE = new ComponentsDefine(); private static ComponentsDefine INSTANCE = new ComponentsDefine();
private String[] components; private String[] components;
...@@ -101,7 +105,7 @@ public class ComponentsDefine { ...@@ -101,7 +105,7 @@ public class ComponentsDefine {
} }
public ComponentsDefine() { public ComponentsDefine() {
components = new String[44]; components = new String[47];
addComponent(TOMCAT); addComponent(TOMCAT);
addComponent(HTTPCLIENT); addComponent(HTTPCLIENT);
addComponent(DUBBO); addComponent(DUBBO);
...@@ -135,6 +139,10 @@ public class ComponentsDefine { ...@@ -135,6 +139,10 @@ public class ComponentsDefine {
addComponent(KAFKA_CONSUMER); addComponent(KAFKA_CONSUMER);
addComponent(MONGO_DRIVER); addComponent(MONGO_DRIVER);
addComponent(SOFARPC); addComponent(SOFARPC);
addComponent(ACTIVEMQ_PRODUCER);
addComponent(ACTIVEMQ_CONSUMER);
} }
private void addComponent(OfficialComponent component) { private void addComponent(OfficialComponent component) {
......
...@@ -59,6 +59,11 @@ public final class Tags { ...@@ -59,6 +59,11 @@ public final class Tags {
*/ */
public static final StringTag DB_BIND_VARIABLES = new StringTag("db.bind_vars"); public static final StringTag DB_BIND_VARIABLES = new StringTag("db.bind_vars");
/**
* MQ_QUEUE records the queue name of message-middleware
*/
public static final StringTag MQ_QUEUE = new StringTag("mq.queue");
/** /**
* MQ_BROKER records the broker address of message-middleware * MQ_BROKER records the broker address of message-middleware
*/ */
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-RC-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-activemq-5.x-plugin</artifactId>
<name>activemq-5.x-plugin</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.14.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- 源码插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>attach-sources</id>
<phase>none</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq;
import org.apache.activemq.ActiveMQSession;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* @author withlin
*/
public class ActiveMQConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ActiveMQSession session = (ActiveMQSession)allArguments[0];
objInst.setSkyWalkingDynamicField(session.getConnection().getTransport().getRemoteAddress().split("//")[1]);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq;
import org.apache.activemq.command.MessageDispatch;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import java.lang.reflect.Method;
/**
* @author withlin
*/
public class ActiveMQConsumerInterceptor implements InstanceMethodsAroundInterceptor {
public static final String OPERATE_NAME_PREFIX = "ActiveMQ/";
public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
String url = (String) objInst.getSkyWalkingDynamicField();
MessageDispatch messageDispatch = (MessageDispatch) allArguments[0];
AbstractSpan activeSpan = null;
if (messageDispatch.getDestination().getDestinationType() == 1 || messageDispatch.getDestination().getDestinationType() == 5) {
activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Queue/" + messageDispatch.getDestination().getPhysicalName() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis());
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_QUEUE.set(activeSpan, messageDispatch.getDestination().getPhysicalName());
} else if (messageDispatch.getDestination().getDestinationType() == 2 || messageDispatch.getDestination().getDestinationType() == 6) {
activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" + messageDispatch.getDestination().getPhysicalName() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis());
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_TOPIC.set(activeSpan, messageDispatch.getDestination().getPhysicalName());
}
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_CONSUMER);
SpanLayer.asMQ(activeSpan);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(messageDispatch.getMessage().getProperty(next.getHeadKey()).toString());
}
ContextManager.extract(contextCarrier);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq;
import org.apache.activemq.ActiveMQSession;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* @author withlin
*/
public class ActiveMQProducerConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ActiveMQSession session = (ActiveMQSession)allArguments[0];
objInst.setSkyWalkingDynamicField(session.getConnection().getTransport().getRemoteAddress().split("//")[1]);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import javax.jms.Message;
import java.lang.reflect.Method;
/**
* @author withlin
*/
public class ActiveMQProducerInterceptor implements InstanceMethodsAroundInterceptor {
public static final String OPERATE_NAME_PREFIX = "ActiveMQ/";
public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
ActiveMQDestination activeMQDestination = (ActiveMQDestination) allArguments[0];
Message message = (Message) allArguments[1];
String url = (String) objInst.getSkyWalkingDynamicField();
AbstractSpan activeSpan = null;
if (activeMQDestination.getDestinationType() == 1 || activeMQDestination.getDestinationType() == 5) {
activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + "Queue/" + activeMQDestination.getPhysicalName() + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, url);
Tags.MQ_BROKER.set(activeSpan,url);
Tags.MQ_QUEUE.set(activeSpan,activeMQDestination.getPhysicalName());
} else if (activeMQDestination.getDestinationType() == 2 || activeMQDestination.getDestinationType() == 6) {
activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + "Topic/" + activeMQDestination.getPhysicalName() + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, url);
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_TOPIC.set(activeSpan,activeMQDestination.getPhysicalName());
}
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_PRODUCER);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
message.setStringProperty(next.getHeadKey(),next.getHeadValue());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
/**
* {@link ActiveMQConsumerInstrumentation} presents that skywalking intercepts {@link org.apache.activemq.ActiveMQMessageConsumer}.
*
* @author withlin
*/
public class ActiveMQConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.activemq.ActiveMQConsumerInterceptor";
public static final String ENHANCE_CLASS_CONSUMER = "org.apache.activemq.ActiveMQMessageConsumer";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.activemq.ActiveMQConsumerConstructorInterceptor";
public static final String ENHANCE_METHOD_DISPATCH = "dispatch";
public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.activemq.ActiveMQSession";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0,CONSTRUCTOR_INTERCEPT_TYPE);
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD_DISPATCH);
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_CONSUMER);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
/**
* {@link ActiveMQProducerInstrumentation} presents that skywalking intercepts {@link org.apache.activemq.ActiveMQMessageProducer}.
*
* @author withlin
*/
public class ActiveMQProducerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.activemq.ActiveMQProducerInterceptor";
public static final String ENHANCE_CLASS_PRODUCER = "org.apache.activemq.ActiveMQMessageProducer";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.activemq.ActiveMQProducerConstructorInterceptor";
public static final String ENHANCE_METHOD = "send";
public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.activemq.ActiveMQSession";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0,CONSTRUCTOR_INTERCEPT_TYPE);
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD).and(takesArgumentWithType(0,"javax.jms.Destination")).and(takesArgumentWithType(1,"javax.jms.Message"));
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_PRODUCER);
}
}
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
activemq-5.x=org.apache.skywalking.apm.plugin.activemq.define.ActiveMQProducerInstrumentation
activemq-5.x=org.apache.skywalking.apm.plugin.activemq.define.ActiveMQConsumerInstrumentation
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import javax.jms.JMSException;
import java.io.IOException;
import java.net.URI;
import java.security.cert.X509Certificate;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
public class ActiveMQConsumerAndProducerConstructorInterceptorTest {
@Mock
private ActiveMQConnection activeMQConnection;
private IdGenerator idGenerator;
private JMSStatsImpl jmsStats;
@Mock
private ActiveMQSession activeMQSession;
private SessionId sessionId;
public class TransportTest implements Transport {
private String remoteAddress;
@Override
public void oneway(Object o) throws IOException {
}
@Override
public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
return null;
}
@Override
public Object request(Object o) throws IOException {
return null;
}
@Override
public Object request(Object o, int i) throws IOException {
return null;
}
@Override
public TransportListener getTransportListener() {
return null;
}
@Override
public void setTransportListener(TransportListener transportListener) {
}
@Override
public <T> T narrow(Class<T> aClass) {
return null;
}
@Override
public String getRemoteAddress() {
return this.remoteAddress;
}
public String setRemoteAddress(String remoteAddress) {
return this.remoteAddress = remoteAddress;
}
@Override
public boolean isFaultTolerant() {
return false;
}
@Override
public boolean isDisposed() {
return false;
}
@Override
public boolean isConnected() {
return false;
}
@Override
public boolean isReconnectSupported() {
return false;
}
@Override
public boolean isUpdateURIsSupported() {
return false;
}
@Override
public void reconnect(URI uri) throws IOException {
}
@Override
public void updateURIs(boolean b, URI[] uris) throws IOException {
}
@Override
public int getReceiveCounter() {
return 0;
}
@Override
public X509Certificate[] getPeerCertificates() {
return new X509Certificate[0];
}
@Override
public void setPeerCertificates(X509Certificate[] x509Certificates) {
}
@Override
public WireFormat getWireFormat() {
return null;
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
}
public class TestConnection extends ActiveMQConnection {
public TestConnection(Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
super(transport, clientIdGenerator, connectionIdGenerator, factoryStats);
}
}
private class TestActiveMQSession extends ActiveMQSession {
public TestActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
super(connection, sessionId, acknowledgeMode, asyncDispatch, sessionAsyncDispatch);
}
}
private ActiveMQConsumerConstructorInterceptor activeMQConsumerAndProducerConstructorInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private String test;
@Override
public Object getSkyWalkingDynamicField() {
return test;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
test = (String)value;
}
};
@Before
public void setUp() throws Exception {
TransportTest transport = new TransportTest();
transport.setRemoteAddress("tcp://127.0.0.1:61616");
idGenerator = new IdGenerator("aaa");
jmsStats = new JMSStatsImpl();
activeMQConnection = new TestConnection(transport,idGenerator,idGenerator,jmsStats);
sessionId = new SessionId();
activeMQSession = new TestActiveMQSession(activeMQConnection,sessionId,1,true,true);
}
@Test
public void TestActiveMQConsumerAndProducerConstructorInterceptor() {
activeMQConsumerAndProducerConstructorInterceptor = new ActiveMQConsumerConstructorInterceptor();
activeMQConsumerAndProducerConstructorInterceptor.onConstruct(enhancedInstance,new Object[] {activeMQSession});
assertThat((String) enhancedInstance.getSkyWalkingDynamicField(), is("127.0.0.1:61616"));
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.CommandVisitor;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.junit.*;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import org.junit.Before;
import org.junit.Test;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class ActiveMQConsumerInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private ActiveMQConsumerInterceptor activeMQConsumerInterceptor;
private Object[] arguments;
private Class[] argumentType;
private MessageDispatch messageDispatch;
public class Des extends ActiveMQDestination {
@Override
protected String getQualifiedPrefix() {
return null;
}
@Override
public byte getDestinationType() {
return 1;
}
@Override
public byte getDataStructureType() {
return 0;
}
}
public class Msg extends Message {
@Override
public Message copy() {
return null;
}
@Override
public void clearBody() throws JMSException {
}
@Override
public void storeContent() {
}
@Override
public void storeContentAndClear() {
}
@Override
public Response visit(CommandVisitor commandVisitor) throws Exception {
return null;
}
@Override
public byte getDataStructureType() {
return 0;
}
}
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
@Override
public Object getSkyWalkingDynamicField() {
return "localhost:60601";
}
@Override
public void setSkyWalkingDynamicField(Object value) {
}
};
@Before
public void setUp() throws IOException {
activeMQConsumerInterceptor = new ActiveMQConsumerInterceptor();
messageDispatch = new MessageDispatch();
Des des = new Des();
des.setPhysicalName("test");
messageDispatch.setDestination(des);
Message msg = new Msg();
msg.setProperty("sw3","");
messageDispatch.setMessage(msg);
arguments = new Object[] {messageDispatch};
argumentType = null;
}
@Test
public void testConsumerWithoutMessage() throws Throwable {
activeMQConsumerInterceptor.beforeMethod(enhancedInstance,null,arguments,null,null);
activeMQConsumerInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegments.size(), is(1));
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.activemq;
import java.util.List;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import java.util.Enumeration;
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.ACTIVEMQ_PRODUCER;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class ActiveMQProducerInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private ActiveMQProducerInterceptor producerInterceptor;
private Object[] arguments;
private Class[] argumentType;
private MQDestination mqDestination;
private Message message;
private class MQDestination extends ActiveMQDestination {
@Override
protected String getQualifiedPrefix() {
return null;
}
@Override
public byte getDestinationType() {
return 1;
}
@Override
public byte getDataStructureType() {
return 1;
}
}
public class Msg implements Message {
@Override
public String getJMSMessageID() throws JMSException {
return null;
}
@Override
public void setJMSMessageID(String s) throws JMSException {
}
@Override
public long getJMSTimestamp() throws JMSException {
return 0;
}
@Override
public void setJMSTimestamp(long l) throws JMSException {
}
@Override
public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
return new byte[0];
}
@Override
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {
}
@Override
public void setJMSCorrelationID(String s) throws JMSException {
}
@Override
public String getJMSCorrelationID() throws JMSException {
return null;
}
@Override
public Destination getJMSReplyTo() throws JMSException {
return null;
}
@Override
public void setJMSReplyTo(Destination destination) throws JMSException {
}
@Override
public Destination getJMSDestination() throws JMSException {
return null;
}
@Override
public void setJMSDestination(Destination destination) throws JMSException {
}
@Override
public int getJMSDeliveryMode() throws JMSException {
return 0;
}
@Override
public void setJMSDeliveryMode(int i) throws JMSException {
}
@Override
public boolean getJMSRedelivered() throws JMSException {
return false;
}
@Override
public void setJMSRedelivered(boolean b) throws JMSException {
}
@Override
public String getJMSType() throws JMSException {
return null;
}
@Override
public void setJMSType(String s) throws JMSException {
}
@Override
public long getJMSExpiration() throws JMSException {
return 0;
}
@Override
public void setJMSExpiration(long l) throws JMSException {
}
@Override
public int getJMSPriority() throws JMSException {
return 0;
}
@Override
public void setJMSPriority(int i) throws JMSException {
}
@Override
public void clearProperties() throws JMSException {
}
@Override
public boolean propertyExists(String s) throws JMSException {
return false;
}
@Override
public boolean getBooleanProperty(String s) throws JMSException {
return false;
}
@Override
public byte getByteProperty(String s) throws JMSException {
return 0;
}
@Override
public short getShortProperty(String s) throws JMSException {
return 0;
}
@Override
public int getIntProperty(String s) throws JMSException {
return 0;
}
@Override
public long getLongProperty(String s) throws JMSException {
return 0;
}
@Override
public float getFloatProperty(String s) throws JMSException {
return 0;
}
@Override
public double getDoubleProperty(String s) throws JMSException {
return 0;
}
@Override
public String getStringProperty(String s) throws JMSException {
return null;
}
@Override
public Object getObjectProperty(String s) throws JMSException {
return null;
}
@Override
public Enumeration getPropertyNames() throws JMSException {
return null;
}
@Override
public void setBooleanProperty(String s, boolean b) throws JMSException {
}
@Override
public void setByteProperty(String s, byte b) throws JMSException {
}
@Override
public void setShortProperty(String s, short i) throws JMSException {
}
@Override
public void setIntProperty(String s, int i) throws JMSException {
}
@Override
public void setLongProperty(String s, long l) throws JMSException {
}
@Override
public void setFloatProperty(String s, float v) throws JMSException {
}
@Override
public void setDoubleProperty(String s, double v) throws JMSException {
}
@Override
public void setStringProperty(String s, String s1) throws JMSException {
}
@Override
public void setObjectProperty(String s, Object o) throws JMSException {
}
@Override
public void acknowledge() throws JMSException {
}
@Override
public void clearBody() throws JMSException {
}
}
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
@Override
public Object getSkyWalkingDynamicField() {
return "localhost:60601";
}
@Override
public void setSkyWalkingDynamicField(Object value) {
}
};
@Before
public void setUp() {
producerInterceptor = new ActiveMQProducerInterceptor();
mqDestination = new MQDestination();
mqDestination.setPhysicalName("test");
message = new Msg();
arguments = new Object[] {mqDestination, message};
argumentType = new Class[] {ActiveMQMessageProducer.class};
}
@Test
public void testSendMessage() throws Throwable {
producerInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
producerInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
assertThat(traceSegmentList.size(), is(1));
TraceSegment segment = traceSegmentList.get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertThat(spans.size(), is(1));
assertMessageSpan(spans.get(0));
}
private void assertMessageSpan(AbstractTracingSpan span) {
SpanAssert.assertTag(span, 0, "localhost:60601");
SpanAssert.assertTag(span, 1, "test");
SpanAssert.assertComponent(span, ACTIVEMQ_PRODUCER);
SpanAssert.assertLayer(span, SpanLayer.MQ);
assertThat(span.getOperationName(), is("ActiveMQ/Queue/test/Producer"));
}
}
...@@ -57,6 +57,7 @@ ...@@ -57,6 +57,7 @@
<module>servicecomb-plugin</module> <module>servicecomb-plugin</module>
<module>hystrix-1.x-plugin</module> <module>hystrix-1.x-plugin</module>
<module>sofarpc-plugin</module> <module>sofarpc-plugin</module>
<module>activemq-5.x-plugin</module>
</modules> </modules>
<packaging>pom</packaging> <packaging>pom</packaging>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册