未验证 提交 e9fc6bc6 编写于 作者: D divyakumarjain 提交者: GitHub

Fix #6835 Add compatibility with Kafka client version 2.8.x (#6837)

上级 10ddf0c3
......@@ -16,6 +16,7 @@ Release Notes.
* resolve agent has no retries if connect kafka cluster failed when bootstrap
* Add Seata in the component definition. Seata plugin hosts on Seata project.
* Extended Kafka plugin to properly trace consumers that have topic partitions directly assigned.
* Support Kafka consumer 2.8.0.
* Support print SkyWalking context to logs.
* Add `MessageListener` enhancement in pulsar plugin.
* fix a bug that spring-mvc set an error endpoint name if the controller class annotation implements an interface.
......
......@@ -18,22 +18,15 @@
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
*
**/
public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
public abstract class AbstractConstructorInterceptPoint<T> implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ConsumerConfig config = (ConsumerConfig) allArguments[0];
// set the bootstrap server address
ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
requiredInfo.setGroupId(config.getString("group.id"));
@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ConsumerEnhanceRequiredInfo requiredInfo = resolveConsumerEnhanceRequiredInfo((T) allArguments[0]);
objInst.setSkyWalkingDynamicField(requiredInfo);
}
protected abstract ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(T allArgument);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
public class ConstructorWithConsumerConfigInterceptPoint extends AbstractConstructorInterceptPoint<ConsumerConfig> {
@Override
protected ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(ConsumerConfig configArgument) {
ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
if (configArgument != null) {
// set the bootstrap server address
requiredInfo.setBrokerServers(configArgument.getList("bootstrap.servers"));
requiredInfo.setGroupId(configArgument.getString("group.id"));
}
return requiredInfo;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
*
**/
public class ConstructorWithMapInterceptPoint extends AbstractConstructorInterceptPoint<Map<String, ?>> {
protected ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(Map<String, ?> configArgument) {
ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
if (configArgument != null) {
// set the bootstrap server address
requiredInfo.setBrokerServers(convertToList(configArgument.get("bootstrap.servers")));
requiredInfo.setGroupId((String) configArgument.get("group.id"));
}
return requiredInfo;
}
private List<String> convertToList(Object value) {
if (value instanceof List)
return (List<String>) value;
else if (value instanceof String) {
return Arrays.stream(((String) value).split(",")).collect(Collectors.toList());
}
return Collections.emptyList();
}
}
......@@ -41,7 +41,9 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor";
public static final String CONSTRUCTOR_INTERCEPT_MAP_TYPE = "java.util.Map";
public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithConsumerConfigInterceptPoint";
public static final String MAP_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithMapInterceptPoint";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
......@@ -65,9 +67,21 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
},
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
}
@Override
public String getConstructorInterceptor() {
return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
};
}
......
......@@ -34,15 +34,15 @@ import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerConstructorInterceptorTest {
public class ConstructorWithConsumerConfigInterceptPointTest {
@Mock
private ConsumerConfig consumerConfig;
private ConsumerConfig consumerConfig;
@Mock
private ConsumerConstructorInterceptor constructorInterceptor;
private ConstructorWithConsumerConfigInterceptPoint constructorInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
@Override
......@@ -58,12 +58,12 @@ public class ConsumerConstructorInterceptorTest {
@Before
public void setUp() {
List<String> mockBootstrapServers = new ArrayList<String>();
List<String> mockBootstrapServers = new ArrayList<>();
mockBootstrapServers.add("localhost:9092");
mockBootstrapServers.add("localhost:19092");
when(consumerConfig.getList("bootstrap.servers")).thenReturn(mockBootstrapServers);
constructorInterceptor = new ConsumerConstructorInterceptor();
constructorInterceptor = new ConstructorWithConsumerConfigInterceptPoint();
}
@Test
......@@ -73,4 +73,4 @@ public class ConsumerConstructorInterceptorTest {
assertThat(consumerEnhanceRequiredInfo.getBrokerServers(), is("localhost:9092;localhost:19092"));
}
}
\ No newline at end of file
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ConstructorWithMapInterceptPointTest {
@Mock
private Map<String, String> consumerConfig;
@Mock
private ConstructorWithMapInterceptPoint constructorInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
@Override
public Object getSkyWalkingDynamicField() {
return consumerEnhanceRequiredInfo;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo) value;
}
};
@Before
public void setUp() {
String mockBootstrapServers = "localhost:9092,localhost:19092";
when(consumerConfig.get("bootstrap.servers")).thenReturn(mockBootstrapServers);
constructorInterceptor = new ConstructorWithMapInterceptPoint();
}
@Test
public void testOnConsumer() {
constructorInterceptor.onConstruct(enhancedInstance, new Object[] {consumerConfig});
ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
assertThat(consumerEnhanceRequiredInfo.getBrokerServers(), is("localhost:9092;localhost:19092"));
}
}
......@@ -58,7 +58,7 @@ metrics based on the tracing data.
* [Apache CXF](https://github.com/apache/cxf) 3.x
* MQ
* [RocketMQ](https://github.com/apache/rocketmq) 4.x
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.6.1
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.8.0
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 5.x
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册