未验证 提交 38c182e4 编写于 作者: D Daming 提交者: GitHub

Kafka transporter code polish (#7032)

上级 30349633
......@@ -101,39 +101,39 @@ public class KafkaProducerManager implements BootService, Runnable {
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
AdminClient adminClient = AdminClient.create(properties);
DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
Set<String> topics = topicsResult.values().entrySet().stream()
.map(entry -> {
try {
entry.getValue().get(
KafkaReporterPluginConfig.Plugin.Kafka.GET_TOPIC_TIMEOUT,
TimeUnit.SECONDS
);
return null;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error(e, "Get KAFKA topic:{} error.", entry.getKey());
}
return entry.getKey();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!topics.isEmpty()) {
LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
closeAdminClient(adminClient);
return;
}
try {
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
} catch (Exception e) {
LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
closeAdminClient(adminClient);
return;
try (AdminClient adminClient = AdminClient.create(properties)) {
DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
Set<String> topics = topicsResult.values().entrySet().stream()
.map(entry -> {
try {
entry.getValue().get(
KafkaReporterPluginConfig.Plugin.Kafka.GET_TOPIC_TIMEOUT,
TimeUnit.SECONDS
);
return null;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error(e, "Get KAFKA topic:{} error.", entry.getKey());
}
return entry.getKey();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!topics.isEmpty()) {
LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
return;
}
try {
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
} catch (Exception e) {
LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
return;
}
//notify listeners to send data if no exception been throw
notifyListeners(KafkaConnectionStatus.CONNECTED);
bootProducerFuture.cancel(true);
}
//notify listeners to send data if no exception been throw
notifyListeners(KafkaConnectionStatus.CONNECTED);
bootProducerFuture.cancel(true);
}
private void notifyListeners(KafkaConnectionStatus status) {
......@@ -142,16 +142,6 @@ public class KafkaProducerManager implements BootService, Runnable {
}
}
private void closeAdminClient(AdminClient adminClient) {
if (adminClient != null) {
try {
adminClient.close();
} catch (Exception e) {
LOGGER.error("close kafka admin client failed", e);
}
}
}
/**
* Get the KafkaProducer instance to send data to Kafka broker.
*/
......
......@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
......@@ -92,7 +91,7 @@ public class KafkaFetcherHandlerRegister implements Runnable {
try {
entry.getValue().get();
return null;
} catch (InterruptedException | ExecutionException e) {
} catch (InterruptedException | ExecutionException ignore) {
}
return entry.getKey();
})
......@@ -132,7 +131,7 @@ public class KafkaFetcherHandlerRegister implements Runnable {
consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new BytesDeserializer());
executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue(threadPoolQueueSize),
new ArrayBlockingQueue<>(threadPoolQueueSize),
new CustomThreadFactory("KafkaConsumer"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
......@@ -160,9 +159,7 @@ public class KafkaFetcherHandlerRegister implements Runnable {
try {
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, Bytes>> iterator = consumerRecords.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, Bytes> record = iterator.next();
for (final ConsumerRecord<String, Bytes> record : consumerRecords) {
executor.submit(() -> handlerMap.get(record.topic()).handle(record));
}
if (!enableKafkaMessageAutoCommit) {
......
......@@ -27,6 +27,12 @@ import org.apache.skywalking.oap.server.analyzer.provider.jvm.JVMSourceDispatche
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics.Timer;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* A handler deserializes the message of JVM Metrics and pushes it to downstream.
......@@ -37,17 +43,42 @@ public class JVMMetricsHandler extends AbstractKafkaHandler {
private final NamingControl namingLengthControl;
private final JVMSourceDispatcher jvmSourceDispatcher;
public JVMMetricsHandler(ModuleManager moduleManager, KafkaFetcherConfig config) {
super(moduleManager, config);
this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);
this.namingLengthControl = moduleManager.find(CoreModule.NAME)
private final HistogramMetrics histogram;
private final HistogramMetrics histogramBatch;
private final CounterMetrics errorCounter;
public JVMMetricsHandler(ModuleManager manager, KafkaFetcherConfig config) {
super(manager, config);
this.jvmSourceDispatcher = new JVMSourceDispatcher(manager);
this.namingLengthControl = manager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"meter_in_latency",
"The process latency of meter",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka")
);
histogramBatch = metricsCreator.createHistogramMetric(
"meter_in_latency",
"The process latency of meter",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka")
);
errorCounter = metricsCreator.createCounter(
"meter_analysis_error_count",
"The error number of meter analysis",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka")
);
}
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
try {
try (Timer ignored = histogramBatch.createTimer()) {
JVMMetricCollection metrics = JVMMetricCollection.parseFrom(record.value().get());
if (log.isDebugEnabled()) {
......@@ -62,7 +93,12 @@ public class JVMMetricsHandler extends AbstractKafkaHandler {
builder.setServiceInstance(namingLengthControl.formatInstanceName(builder.getServiceInstance()));
builder.getMetricsList().forEach(jvmMetric -> {
jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric);
try (Timer timer = histogram.createTimer()) {
jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
}
});
} catch (Exception e) {
log.error("handle record failed", e);
......
......@@ -42,8 +42,8 @@ public class JsonLogHandler extends LogHandler {
}
@Override
protected String getProtocolName() {
return "kafka-fetcher-native-json";
protected String getDataFormat() {
return "json";
}
@Override
......
......@@ -50,12 +50,16 @@ public class LogHandler implements KafkaHandler {
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"log_in_latency", "The process latency of log",
new MetricsTag.Keys("protocol"), new MetricsTag.Values(getProtocolName())
"log_in_latency",
"The process latency of log",
new MetricsTag.Keys("protocol", "data_format"),
new MetricsTag.Values("kafka", getDataFormat())
);
errorCounter = metricsCreator.createCounter("log_analysis_error_count", "The error number of log analysis",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values(getProtocolName())
errorCounter = metricsCreator.createCounter(
"log_analysis_error_count",
"The error number of log analysis",
new MetricsTag.Keys("protocol", "data_format"),
new MetricsTag.Values("kafka", getDataFormat())
);
}
......@@ -71,20 +75,17 @@ public class LogHandler implements KafkaHandler {
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
HistogramMetrics.Timer timer = histogram.createTimer();
try {
try (HistogramMetrics.Timer ignore = histogram.createTimer()) {
LogData logData = parseConsumerRecord(record);
logAnalyzerService.doAnalysis(logData);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
} finally {
timer.finish();
}
}
protected String getProtocolName() {
return "kafka-fetcher-native-proto";
protected String getDataFormat() {
return "protobuf";
}
protected LogData parseConsumerRecord(ConsumerRecord<String, Bytes> record) throws Exception {
......
......@@ -38,29 +38,40 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
*/
@Slf4j
public class MeterServiceHandler extends AbstractKafkaHandler {
private IMeterProcessService processService;
private final IMeterProcessService processService;
private final HistogramMetrics histogram;
private final HistogramMetrics histogramBatch;
private final CounterMetrics errorCounter;
public MeterServiceHandler(ModuleManager manager, KafkaFetcherConfig config) {
super(manager, config);
this.processService = manager.find(AnalyzerModule.NAME).provider().getService(IMeterProcessService.class);
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"meter_in_latency", "The process latency of meter",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka-fetcher")
"meter_in_latency",
"The process latency of meter",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka")
);
errorCounter = metricsCreator.createCounter("meter_analysis_error_count", "The error number of meter analysis",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka-fetcher")
histogramBatch = metricsCreator.createHistogramMetric(
"meter_batch_in_latency",
"The process latency of meter",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka")
);
errorCounter = metricsCreator.createCounter(
"meter_analysis_error_count",
"The error number of meter analysis",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka")
);
}
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
try {
try (HistogramMetrics.Timer timer = histogramBatch.createTimer()) {
MeterDataCollection meterDataCollection = MeterDataCollection.parseFrom(record.value().get());
MeterProcessor processor = processService.createProcessor();
meterDataCollection.getMeterDataList().forEach(meterData -> {
......@@ -72,7 +83,6 @@ public class MeterServiceHandler extends AbstractKafkaHandler {
}
});
processor.process();
} catch (Exception e) {
log.error("handle record failed", e);
}
......
......@@ -27,20 +27,44 @@ import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics.Timer;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag.Keys;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag.Values;
/**
* A handler deserializes the message of profiling snapshot and pushes it to downstream.
*/
@Slf4j
public class ProfileTaskHandler extends AbstractKafkaHandler {
private final HistogramMetrics histogram;
private final CounterMetrics errorCounter;
public ProfileTaskHandler(ModuleManager manager, KafkaFetcherConfig config) {
super(manager, config);
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"profile_task_in_latency",
"The process latency of profile task",
new Keys("protocol"),
new Values("kafka")
);
errorCounter = metricsCreator.createCounter(
"profile_task_analysis_error_count",
"The error number of profile task process",
new Keys("protocol"),
new Values("kafka")
);
}
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
try {
try (Timer ignored = histogram.createTimer()) {
ThreadSnapshot snapshot = ThreadSnapshot.parseFrom(record.value().get());
if (log.isDebugEnabled()) {
log.debug(
......@@ -60,6 +84,7 @@ public class ProfileTaskHandler extends AbstractKafkaHandler {
RecordStreamProcessor.getInstance().in(snapshotRecord);
} catch (Exception e) {
errorCounter.inc();
log.error("handle record failed", e);
}
}
......
......@@ -78,11 +78,7 @@ public class ServiceManagementHandler extends AbstractKafkaHandler {
serviceInstanceUpdate.setName(instanceName);
if (log.isDebugEnabled()) {
log.debug(
"Service[{}] instance[{}] registered.",
serviceName,
instanceName
);
log.debug("Service[{}] instance[{}] registered.", serviceName, instanceName);
}
JsonObject properties = new JsonObject();
......@@ -107,11 +103,7 @@ public class ServiceManagementHandler extends AbstractKafkaHandler {
final String instanceName = namingLengthControl.formatInstanceName(request.getServiceInstance());
if (log.isDebugEnabled()) {
log.debug(
"A ping of Service[{}] instance[{}].",
serviceName,
instanceName
);
log.debug("A ping of Service[{}] instance[{}].", serviceName, instanceName);
}
ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
......
......@@ -41,8 +41,8 @@ public class TraceSegmentHandler extends AbstractKafkaHandler {
private final ISegmentParserService segmentParserService;
private HistogramMetrics histogram;
private CounterMetrics errorCounter;
private final HistogramMetrics histogram;
private final CounterMetrics errorCounter;
public TraceSegmentHandler(ModuleManager moduleManager, KafkaFetcherConfig config) {
super(moduleManager, config);
......@@ -57,19 +57,19 @@ public class TraceSegmentHandler extends AbstractKafkaHandler {
"trace_in_latency",
"The process latency of trace data",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka-fetcher")
new MetricsTag.Values("kafka")
);
errorCounter = metricsCreator.createCounter(
"trace_analysis_error_count",
"The error number of trace analysis",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("kafka-fetcher")
new MetricsTag.Values("kafka")
);
}
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
try {
try (HistogramMetrics.Timer ignore = histogram.createTimer()) {
SegmentObject segment = SegmentObject.parseFrom(record.value().get());
if (log.isDebugEnabled()) {
log.debug(
......@@ -78,17 +78,9 @@ public class TraceSegmentHandler extends AbstractKafkaHandler {
segment.getServiceInstance()
);
}
HistogramMetrics.Timer timer = histogram.createTimer();
try {
segmentParserService.send(segment);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
} finally {
timer.finish();
}
segmentParserService.send(segment);
} catch (InvalidProtocolBufferException e) {
errorCounter.inc();
log.error("handle record failed", e);
}
}
......
......@@ -41,6 +41,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleProvider;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
......@@ -82,6 +85,12 @@ public class JVMMetricsHandlerTest {
registerServiceImplementation(SourceReceiver.class, SOURCE_RECEIVER);
}
});
register(TelemetryModule.NAME, () -> new MockModuleProvider() {
@Override
protected void register() {
registerServiceImplementation(MetricsCreator.class, new MetricsCreatorNoop());
}
});
}
};
handler = new JVMMetricsHandler(manager, config);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册