未验证 提交 b4114ac0 编写于 作者: D Darcy 提交者: GitHub

enhancement agent kafka reporter plugin, add namespace concept (#6781)

上级 16b51d55
......@@ -12,6 +12,8 @@ Release Notes.
* Improve `GlobalIdGenerator` performance.
* Add an agent plugin to support elasticsearch7.
* Add `jsonrpc4j` agent plugin.
* new options to support multi skywalking cluster use same kafka cluster(plugin.kafka.namespace)
* resolve agent has no retries if connect kafka cluster failed when bootstrap
* Add Seata in the component definition. Seata plugin hosts on Seata project.
* Extended Kafka plugin to properly trace consumers that have topic partitions directly assigned.
* Support print SkyWalking context to logs.
......
......@@ -107,6 +107,8 @@ plugin.mount=${SW_MOUNT_FOLDERS:plugins,activations}
# Kafka producer configuration
# plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# if you want to set namespace. please make sure the OAP server has set it in Kafka fetcher module
# plugin.kafka.namspacce=${SW_KAFKA_NAMESPACE:""}
# Match spring bean with regex expression for classname
# plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
enum KafkaConnectionStatus {
CONNECTED, DISCONNECT
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
interface KafkaConnectionStatusListener {
void onStatusChanged(KafkaConnectionStatus status);
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.kafka;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
......@@ -26,11 +27,15 @@ import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
* For compatible with {@link ContextManagerExtendService}, don't need to manage connection status by self.
*/
@OverrideImplementor(ContextManagerExtendService.class)
public class KafkaContextManagerExtendService extends ContextManagerExtendService {
public class KafkaContextManagerExtendService extends ContextManagerExtendService implements KafkaConnectionStatusListener {
@Override
public void prepare() {
statusChanged(GRPCChannelStatus.CONNECTED);
ServiceManager.INSTANCE.findService(KafkaProducerManager.class).addListener(this);
}
@Override
public void onStatusChanged(KafkaConnectionStatus status) {
statusChanged(GRPCChannelStatus.CONNECTED);
}
}
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -38,13 +39,11 @@ import org.apache.skywalking.apm.network.language.agent.v3.JVMMetricCollection;
* A report to send JVM Metrics data to Kafka Broker.
*/
@OverrideImplementor(JVMMetricsSender.class)
public class KafkaJVMMetricsSender extends JVMMetricsSender {
public class KafkaJVMMetricsSender extends JVMMetricsSender implements KafkaConnectionStatusListener {
private static final ILog LOGGER = LogManager.getLogger(KafkaJVMMetricsSender.class);
private KafkaProducer<String, Bytes> producer;
private String topic;
private BlockingQueue<JVMMetric> queue;
private volatile boolean running = false;
@Override
public void run() {
......@@ -52,7 +51,7 @@ public class KafkaJVMMetricsSender extends JVMMetricsSender {
List<JVMMetric> buffer = new ArrayList<>();
queue.drainTo(buffer);
if (running) {
if (producer != null) {
JVMMetricCollection metrics = JVMMetricCollection.newBuilder()
.addAllMetrics(buffer)
.setService(Config.Agent.SERVICE_NAME)
......@@ -61,15 +60,15 @@ public class KafkaJVMMetricsSender extends JVMMetricsSender {
if (LOGGER.isDebugEnable()) {
LOGGER.debug(
"JVM metrics reporting, topic: {}, key: {}, length: {}", topic, metrics.getServiceInstance(),
buffer.size()
"JVM metrics reporting, topic: {}, key: {}, length: {}", topic, metrics.getServiceInstance(),
buffer.size()
);
}
producer.send(new ProducerRecord<>(
topic,
metrics.getServiceInstance(),
Bytes.wrap(metrics.toByteArray())
topic,
metrics.getServiceInstance(),
Bytes.wrap(metrics.toByteArray())
));
producer.flush();
}
......@@ -79,14 +78,13 @@ public class KafkaJVMMetricsSender extends JVMMetricsSender {
@Override
public void prepare() {
queue = new LinkedBlockingQueue<>(Config.Jvm.BUFFER_SIZE);
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS;
KafkaProducerManager producerManager = ServiceManager.INSTANCE.findService(KafkaProducerManager.class);
producerManager.addListener(this);
topic = producerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS);
}
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
running = true;
}
@Override
......@@ -96,4 +94,11 @@ public class KafkaJVMMetricsSender extends JVMMetricsSender {
queue.offer(metric);
}
}
@Override
public void onStatusChanged(KafkaConnectionStatus status) {
if (status == KafkaConnectionStatus.CONNECTED) {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -37,7 +38,7 @@ import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection;
* A report to send Metrics data of meter system to Kafka Broker.
*/
@OverrideImplementor(MeterSender.class)
public class KafkaMeterSender extends MeterSender {
public class KafkaMeterSender extends MeterSender implements KafkaConnectionStatusListener {
private static final ILog LOGGER = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
private String topic;
......@@ -45,16 +46,20 @@ public class KafkaMeterSender extends MeterSender {
@Override
public void prepare() {
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METER;
KafkaProducerManager producerManager = ServiceManager.INSTANCE.findService(KafkaProducerManager.class);
producerManager.addListener(this);
topic = producerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METER);
}
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
@Override
public void send(Map<MeterId, BaseMeter> meterMap, MeterService meterService) {
if (producer == null) {
return;
}
MeterDataCollection.Builder builder = MeterDataCollection.newBuilder();
transform(meterMap, meterData -> {
if (LOGGER.isDebugEnable()) {
......@@ -67,4 +72,11 @@ public class KafkaMeterSender extends MeterSender {
producer.flush();
}
@Override
public void onStatusChanged(KafkaConnectionStatus status) {
if (status == KafkaConnectionStatus.CONNECTED) {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
}
}
......@@ -18,14 +18,19 @@
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
......@@ -35,8 +40,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
/**
* Configuring, initializing and holding a KafkaProducer instance for reporters.
......@@ -46,23 +56,53 @@ public class KafkaProducerManager implements BootService, Runnable {
private static final ILog LOGGER = LogManager.getLogger(KafkaProducerManager.class);
private KafkaProducer<String, Bytes> producer;
private Set<String> topics = new HashSet<>();
private List<KafkaConnectionStatusListener> listeners = new ArrayList<>();
private volatile KafkaProducer<String, Bytes> producer;
private ScheduledFuture<?> bootProducerFuture;
@Override
public void prepare() {
}
@Override
public void boot() {
bootProducerFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("kafkaProducerInitThread")
).scheduleAtFixedRate(new RunnableWithExceptionProtection(
this,
t -> LOGGER.error("unexpected exception.", t)
), 0, 120, TimeUnit.SECONDS);
}
String formatTopicNameThenRegister(String topic) {
String topicName = StringUtil.isBlank(KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE) ? topic
: KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE + "-" + topic;
topics.add(topicName);
return topicName;
}
public void addListener(KafkaConnectionStatusListener listener) {
if (!listeners.contains(listener)) {
listeners.add(listener);
}
}
@Override
public void onComplete() {
}
@Override
public void prepare() throws Throwable {
public void run() {
Properties properties = new Properties();
properties.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG.forEach((k, v) -> properties.setProperty(k, v));
KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
AdminClient adminClient = AdminClient.create(properties);
DescribeTopicsResult topicsResult = adminClient.describeTopics(Arrays.asList(
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_MANAGEMENT,
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS,
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_PROFILING,
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT,
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METER
));
DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
Set<String> topics = topicsResult.values().entrySet().stream()
.map(entry -> {
try {
......@@ -79,24 +119,24 @@ public class KafkaProducerManager implements BootService, Runnable {
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!topics.isEmpty()) {
throw new Exception("These topics" + topics + " don't exist.");
LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
return;
}
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
}
@Override
public void boot() {
}
@Override
public void onComplete() {
try {
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
} catch (Exception e) {
LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
}
//notify listeners to send data if no exception been throw
notifyListeners(KafkaConnectionStatus.CONNECTED);
bootProducerFuture.cancel(true);
}
@Override
public void run() {
private void notifyListeners(KafkaConnectionStatus status) {
for (KafkaConnectionStatusListener listener : listeners) {
listener.onStatusChanged(status);
}
}
/**
......@@ -106,6 +146,16 @@ public class KafkaProducerManager implements BootService, Runnable {
return producer;
}
/**
* make kafka producer init later but before {@link GRPCChannelManager}
*
* @return priority value
*/
@Override
public int priority() {
return ServiceManager.INSTANCE.findService(GRPCChannelManager.class).priority() - 1;
}
@Override
public void shutdown() {
producer.flush();
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.List;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -35,7 +36,7 @@ import org.apache.skywalking.apm.network.language.profile.v3.ThreadSnapshot;
* to report the tracing profile snapshot data by Kafka Producer.
*/
@OverrideImplementor(ProfileSnapshotSender.class)
public class KafkaProfileSnapshotSender extends ProfileSnapshotSender {
public class KafkaProfileSnapshotSender extends ProfileSnapshotSender implements KafkaConnectionStatusListener {
private static final ILog LOGGER = LogManager.getLogger(ProfileSnapshotSender.class);
private String topic;
......@@ -43,16 +44,20 @@ public class KafkaProfileSnapshotSender extends ProfileSnapshotSender {
@Override
public void prepare() {
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_PROFILING;
KafkaProducerManager producerManager = ServiceManager.INSTANCE.findService(KafkaProducerManager.class);
producerManager.addListener(this);
topic = producerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_PROFILING);
}
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
@Override
public void send(final List<TracingThreadSnapshot> buffer) {
if (producer == null) {
return;
}
for (TracingThreadSnapshot snapshot : buffer) {
final ThreadSnapshot object = snapshot.transform();
if (LOGGER.isDebugEnable()) {
......@@ -69,4 +74,10 @@ public class KafkaProfileSnapshotSender extends ProfileSnapshotSender {
}
}
@Override
public void onStatusChanged(KafkaConnectionStatus status) {
if (status == KafkaConnectionStatus.CONNECTED) {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
}
}
......@@ -42,6 +42,8 @@ public class KafkaReporterPluginConfig {
public static String TOPIC_METER = "skywalking-meters";
public static String NAMESPACE = "";
public static Map<String, String> PRODUCER_CONFIG = new HashMap<>();
/**
......
......@@ -47,7 +47,7 @@ import org.apache.skywalking.apm.util.StringUtil;
* A service management data(Instance registering properties and Instance pinging) reporter.
*/
@OverrideImplementor(ServiceManagementClient.class)
public class KafkaServiceManagementServiceClient implements BootService, Runnable {
public class KafkaServiceManagementServiceClient implements BootService, Runnable, KafkaConnectionStatusListener {
private static final ILog LOGGER = LogManager.getLogger(KafkaServiceManagementServiceClient.class);
private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES;
......@@ -62,7 +62,9 @@ public class KafkaServiceManagementServiceClient implements BootService, Runnabl
@Override
public void prepare() {
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_MANAGEMENT;
KafkaProducerManager producerManager = ServiceManager.INSTANCE.findService(KafkaProducerManager.class);
producerManager.addListener(this);
topic = producerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_MANAGEMENT);
SERVICE_INSTANCE_PROPERTIES = new ArrayList<>();
for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
......@@ -79,8 +81,6 @@ public class KafkaServiceManagementServiceClient implements BootService, Runnabl
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("ServiceManagementClientKafkaProducer")
).scheduleAtFixedRate(new RunnableWithExceptionProtection(
......@@ -91,6 +91,9 @@ public class KafkaServiceManagementServiceClient implements BootService, Runnabl
@Override
public void run() {
if (producer == null) {
return;
}
if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
InstanceProperties instance = InstanceProperties.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
......@@ -120,6 +123,13 @@ public class KafkaServiceManagementServiceClient implements BootService, Runnabl
}
@Override
public void onStatusChanged(KafkaConnectionStatus status) {
if (status == KafkaConnectionStatus.CONNECTED) {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
}
@Override
public void shutdown() {
heartbeatFuture.cancel(true);
......
......@@ -44,7 +44,7 @@ import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SI
* A tracing segment data reporter.
*/
@OverrideImplementor(TraceSegmentServiceClient.class)
public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener {
public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, KafkaConnectionStatusListener {
private static final ILog LOGGER = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
private String topic;
......@@ -54,15 +54,15 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr
@Override
public void prepare() {
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT;
KafkaProducerManager producerManager = ServiceManager.INSTANCE.findService(KafkaProducerManager.class);
producerManager.addListener(this);
topic = producerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT);
}
@Override
public void boot() {
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
@Override
......@@ -83,6 +83,9 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr
@Override
public void consume(final List<TraceSegment> data) {
if (producer == null) {
return;
}
data.forEach(traceSegment -> {
SegmentObject upstreamSegment = traceSegment.transform();
ProducerRecord<String, Bytes> record = new ProducerRecord<>(
......@@ -121,4 +124,10 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr
carrier.produce(traceSegment);
}
@Override
public void onStatusChanged(KafkaConnectionStatus status) {
if (status == KafkaConnectionStatus.CONNECTED) {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
public class KafkaProducerManagerTest {
@Test
public void testAddListener() throws Exception {
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
AtomicInteger counter = new AtomicInteger();
int times = 100;
for (int i = 0; i < times; i++) {
kafkaProducerManager.addListener(new MockListener(counter));
}
Whitebox.invokeMethod(kafkaProducerManager, "notifyListeners", KafkaConnectionStatus.CONNECTED);
assertEquals(counter.get(), times);
}
@Test
public void testFormatTopicNameThenRegister() {
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE = "product";
String value = kafkaProducerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS);
String expectValue = KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE + "-" + KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS;
assertEquals(value, expectValue);
KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE = "";
value = kafkaProducerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS);
assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value);
}
static class MockListener implements KafkaConnectionStatusListener {
private AtomicInteger counter;
public MockListener(AtomicInteger counter) {
this.counter = counter;
}
@Override
public void onStatusChanged(KafkaConnectionStatus status) {
counter.incrementAndGet();
}
}
}
......@@ -84,11 +84,15 @@ Kafka Fetcher pulls messages from Kafka Broker(s) what is the Agent delivered. C
Kafka Fetcher is disabled in default, and we configure as following to enable.
namespace aims to isolate multi OAP cluster when using the same Kafka cluster.
if you set a namespace for Kafka fetcher, OAP will add a prefix to topic name. you should also set namespace in `agent.config`, the property named `plugin.kafka.namespace`.
```yaml
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
namespace: ${SW_NAMESPACE:""}
```
`skywalking-segments`, `skywalking-metrics`, `skywalking-profile`, `skywalking-managements`, `skywalking-meters`
......@@ -102,6 +106,7 @@ kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
namespace: ${SW_NAMESPACE:""}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
......@@ -117,6 +122,7 @@ kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
namespace: ${SW_NAMESPACE:""}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
......@@ -133,6 +139,7 @@ kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
namespace: ${SW_NAMESPACE:""}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
......
......@@ -210,6 +210,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | active | Activate the Prometheus fetcher. | SW_PROMETHEUS_FETCHER_ACTIVE | false |
| kafka-fetcher | default | Read [fetcher doc](backend-fetcher.md) for more details | - | - |
| - | - | bootstrapServers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. | SW_KAFKA_FETCHER_SERVERS | localhost:9092 |
| - | - | namespace | namespace aims to isolate multi OAP cluster when using the same Kafka cluster.if you set a namespace for Kafka fetcher, OAP will add a prefix to topic name. you should also set namespace in `agent.config`, the property named| SW_NAMESPACE | - |
| - | - | groupId | A unique string that identifies the consumer group this consumer belongs to.| - | skywalking-consumer |
| - | - | consumePartitions | Which PartitionId(s) of the topics assign to the OAP server. If more than one, is separated by commas. | SW_KAFKA_FETCHER_CONSUME_PARTITIONS | - |
| - | - | isSharding | it was true when OAP Server in cluster. | SW_KAFKA_FETCHER_IS_SHARDING | false |
......
......@@ -161,6 +161,7 @@ property key | Description | Default |
`plugin.kafka.topic_segment` | Specify which Kafka topic name for traces data to report to. | `skywalking_segments` |
`plugin.kafka.topic_profilings` | Specify which Kafka topic name for Thread Profiling snapshot to report to. | `skywalking_profilings` |
`plugin.kafka.topic_management` | Specify which Kafka topic name for the register or heartbeat data of Service Instance to report to. | `skywalking_managements` |
`plugin.kafka.namespace` | isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ). | `` |
`plugin.springannotation.classname_match_regex` | Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated. | `All the spring beans tagged with @Bean,@Service,@Dao, or @Repository.` |
`plugin.toolkit.log.transmit_formatted` | Whether or not to transmit logged data as formatted or un-formatted. | `true` |
`plugin.toolkit.log.grpc.reporter.server_host` | Specify which grpc server's host for log data to report to. | `127.0.0.1` |
......
......@@ -340,6 +340,7 @@ kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:-}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
namespace: ${SW_NAMESPACE:""}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
......
......@@ -88,6 +88,8 @@ public class KafkaFetcherConfig extends ModuleConfig {
private int kafkaHandlerThreadPoolSize;
private int kafkaHandlerThreadPoolQueueSize;
private String namespace = "";
private String mm2SourceAlias = "";
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public abstract class AbstractKafkaHandler implements KafkaHandler {
protected KafkaFetcherConfig config;
public AbstractKafkaHandler(ModuleManager manager, KafkaFetcherConfig config) {
this.config = config;
}
@Override
public String getTopic() {
StringBuilder sb = new StringBuilder();
if (StringUtils.isNotBlank(config.getMm2SourceAlias())) {
sb.append(config.getMm2SourceAlias()).append(config.getMm2SourceSeparator());
}
if (StringUtil.isNotBlank(config.getNamespace())) {
sb.append(config.getNamespace()).append("-");
}
sb.append(getPlainTopic());
return sb.toString();
}
protected abstract String getPlainTopic();
@Override
public String getConsumePartitions() {
return config.getConsumePartitions();
}
}
......@@ -32,19 +32,17 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
* A handler deserializes the message of JVM Metrics and pushes it to downstream.
*/
@Slf4j
public class JVMMetricsHandler implements KafkaHandler {
public class JVMMetricsHandler extends AbstractKafkaHandler {
private final NamingControl namingLengthControl;
private final JVMSourceDispatcher jvmSourceDispatcher;
private final KafkaFetcherConfig config;
public JVMMetricsHandler(ModuleManager moduleManager, KafkaFetcherConfig config) {
super(moduleManager, config);
this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);
this.namingLengthControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
this.config = config;
}
@Override
......@@ -72,12 +70,7 @@ public class JVMMetricsHandler implements KafkaHandler {
}
@Override
public String getTopic() {
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfMetrics();
}
@Override
public String getConsumePartitions() {
return config.getConsumePartitions();
protected String getPlainTopic() {
return config.getTopicNameOfMetrics();
}
}
......@@ -37,14 +37,13 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
* A handler deserializes the message of meter system data and pushes it to downstream.
*/
@Slf4j
public class MeterServiceHandler implements KafkaHandler {
private KafkaFetcherConfig config;
public class MeterServiceHandler extends AbstractKafkaHandler {
private IMeterProcessService processService;
private final HistogramMetrics histogram;
private final CounterMetrics errorCounter;
public MeterServiceHandler(ModuleManager manager, KafkaFetcherConfig config) {
this.config = config;
super(manager, config);
this.processService = manager.find(AnalyzerModule.NAME).provider().getService(IMeterProcessService.class);
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
......@@ -80,12 +79,7 @@ public class MeterServiceHandler implements KafkaHandler {
}
@Override
public String getTopic() {
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfMeters();
}
@Override
public String getConsumePartitions() {
return config.getConsumePartitions();
protected String getPlainTopic() {
return config.getTopicNameOfMeters();
}
}
......@@ -32,12 +32,10 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
* A handler deserializes the message of profiling snapshot and pushes it to downstream.
*/
@Slf4j
public class ProfileTaskHandler implements KafkaHandler {
private final KafkaFetcherConfig config;
public class ProfileTaskHandler extends AbstractKafkaHandler {
public ProfileTaskHandler(ModuleManager manager, KafkaFetcherConfig config) {
this.config = config;
super(manager, config);
}
@Override
......@@ -67,12 +65,7 @@ public class ProfileTaskHandler implements KafkaHandler {
}
@Override
public String getTopic() {
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfProfiling();
}
@Override
public String getConsumePartitions() {
return config.getConsumePartitions();
protected String getPlainTopic() {
return config.getTopicNameOfProfiling();
}
}
......@@ -43,14 +43,13 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
* A handler deserializes the message of Service Management and pushes it to downstream.
*/
@Slf4j
public class ServiceManagementHandler implements KafkaHandler {
public class ServiceManagementHandler extends AbstractKafkaHandler {
private final SourceReceiver sourceReceiver;
private final NamingControl namingLengthControl;
private final KafkaFetcherConfig config;
public ServiceManagementHandler(ModuleManager moduleManager, KafkaFetcherConfig config) {
super(moduleManager, config);
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.namingLengthControl = moduleManager.find(CoreModule.NAME)
.provider()
......@@ -123,12 +122,7 @@ public class ServiceManagementHandler implements KafkaHandler {
}
@Override
public String getTopic() {
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfManagements();
}
@Override
public String getConsumePartitions() {
return config.getConsumePartitions();
protected String getPlainTopic() {
return config.getTopicNameOfManagements();
}
}
......@@ -37,17 +37,15 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
* A handler deserializes the message of the trace segment data and pushes it to downstream.
*/
@Slf4j
public class TraceSegmentHandler implements KafkaHandler {
public class TraceSegmentHandler extends AbstractKafkaHandler {
private final KafkaFetcherConfig config;
private final ISegmentParserService segmentParserService;
private HistogramMetrics histogram;
private CounterMetrics errorCounter;
public TraceSegmentHandler(ModuleManager moduleManager,
KafkaFetcherConfig config) {
this.config = config;
public TraceSegmentHandler(ModuleManager moduleManager, KafkaFetcherConfig config) {
super(moduleManager, config);
this.segmentParserService = moduleManager.find(AnalyzerModule.NAME)
.provider()
.getService(ISegmentParserService.class);
......@@ -96,12 +94,7 @@ public class TraceSegmentHandler implements KafkaHandler {
}
@Override
public String getTopic() {
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfTracingSegments();
}
@Override
public String getConsumePartitions() {
return config.getConsumePartitions();
protected String getPlainTopic() {
return config.getTopicNameOfTracingSegments();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class AbstractKafkaHandlerTest {
@Test
public void testGetTopic() {
KafkaFetcherConfig config = new KafkaFetcherConfig();
MockModuleManager manager = new MockModuleManager() {
@Override
protected void init() {
}
};
String plainTopic = config.getTopicNameOfTracingSegments();
MockKafkaHandler kafkaHandler = new MockKafkaHandler(plainTopic, manager, config);
// unset namespace and mm2
assertEquals(kafkaHandler.getTopic(), plainTopic);
//set namespace only
String namespace = "product";
config.setNamespace(namespace);
assertEquals(namespace + "-" + plainTopic, kafkaHandler.getTopic());
//set mm2 only
config.setNamespace("");
String mm2SourceAlias = "DC1";
config.setMm2SourceAlias(mm2SourceAlias);
String mm2SourceSeparator = ".";
config.setMm2SourceSeparator(mm2SourceSeparator);
assertEquals(mm2SourceAlias + mm2SourceSeparator + plainTopic, kafkaHandler.getTopic());
//set namespace and mm2
config.setNamespace(namespace);
config.setMm2SourceAlias(mm2SourceAlias);
config.setMm2SourceSeparator(mm2SourceSeparator);
assertEquals(mm2SourceAlias + mm2SourceSeparator + namespace + "-" + plainTopic, kafkaHandler.getTopic());
}
static class MockKafkaHandler extends AbstractKafkaHandler {
private String plainTopic;
public MockKafkaHandler(String plainTopic, ModuleManager manager, KafkaFetcherConfig config) {
super(manager, config);
this.plainTopic = plainTopic;
}
@Override
protected String getPlainTopic() {
return plainTopic;
}
@Override
public void handle(ConsumerRecord<String, Bytes> record) {
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册