From 38c182e4c00902e5a6cda49c557f8fdbccab011c Mon Sep 17 00:00:00 2001 From: Daming Date: Wed, 2 Jun 2021 12:56:00 +0800 Subject: [PATCH] Kafka transporter code polish (#7032) --- .../core/kafka/KafkaProducerManager.java | 74 ++++++++----------- .../kafka/KafkaFetcherHandlerRegister.java | 9 +-- .../provider/handler/JVMMetricsHandler.java | 48 ++++++++++-- .../provider/handler/JsonLogHandler.java | 4 +- .../kafka/provider/handler/LogHandler.java | 25 ++++--- .../provider/handler/MeterServiceHandler.java | 30 +++++--- .../provider/handler/ProfileTaskHandler.java | 27 ++++++- .../handler/ServiceManagementHandler.java | 12 +-- .../provider/handler/TraceSegmentHandler.java | 22 ++---- .../handler/JVMMetricsHandlerTest.java | 9 +++ 10 files changed, 156 insertions(+), 104 deletions(-) diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java index 244a1610c5..695b51d4eb 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java @@ -101,39 +101,39 @@ public class KafkaProducerManager implements BootService, Runnable { ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS); KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG.forEach(properties::setProperty); - AdminClient adminClient = AdminClient.create(properties); - DescribeTopicsResult topicsResult = adminClient.describeTopics(topics); - Set topics = topicsResult.values().entrySet().stream() - .map(entry -> { - try { - entry.getValue().get( - KafkaReporterPluginConfig.Plugin.Kafka.GET_TOPIC_TIMEOUT, - TimeUnit.SECONDS - ); - return null; - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOGGER.error(e, "Get KAFKA topic:{} error.", entry.getKey()); - } - return entry.getKey(); - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - if (!topics.isEmpty()) { - LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics); - closeAdminClient(adminClient); - return; - } - - try { - producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer()); - } catch (Exception e) { - LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS); - closeAdminClient(adminClient); - return; + try (AdminClient adminClient = AdminClient.create(properties)) { + DescribeTopicsResult topicsResult = adminClient.describeTopics(topics); + Set topics = topicsResult.values().entrySet().stream() + .map(entry -> { + try { + entry.getValue().get( + KafkaReporterPluginConfig.Plugin.Kafka.GET_TOPIC_TIMEOUT, + TimeUnit.SECONDS + ); + return null; + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOGGER.error(e, "Get KAFKA topic:{} error.", entry.getKey()); + } + return entry.getKey(); + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + if (!topics.isEmpty()) { + LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics); + return; + } + + try { + producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer()); + } catch (Exception e) { + LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS); + return; + } + //notify listeners to send data if no exception been throw + notifyListeners(KafkaConnectionStatus.CONNECTED); + bootProducerFuture.cancel(true); } - //notify listeners to send data if no exception been throw - notifyListeners(KafkaConnectionStatus.CONNECTED); - bootProducerFuture.cancel(true); } private void notifyListeners(KafkaConnectionStatus status) { @@ -142,16 +142,6 @@ public class KafkaProducerManager implements BootService, Runnable { } } - private void closeAdminClient(AdminClient adminClient) { - if (adminClient != null) { - try { - adminClient.close(); - } catch (Exception e) { - LOGGER.error("close kafka admin client failed", e); - } - } - } - /** * Get the KafkaProducer instance to send data to Kafka broker. */ diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java index 63a3c0ea0a..f221e73969 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java @@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.time.Duration; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Properties; @@ -92,7 +91,7 @@ public class KafkaFetcherHandlerRegister implements Runnable { try { entry.getValue().get(); return null; - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException ignore) { } return entry.getKey(); }) @@ -132,7 +131,7 @@ public class KafkaFetcherHandlerRegister implements Runnable { consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new BytesDeserializer()); executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, - new ArrayBlockingQueue(threadPoolQueueSize), + new ArrayBlockingQueue<>(threadPoolQueueSize), new CustomThreadFactory("KafkaConsumer"), new ThreadPoolExecutor.CallerRunsPolicy() ); @@ -160,9 +159,7 @@ public class KafkaFetcherHandlerRegister implements Runnable { try { ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(500L)); if (!consumerRecords.isEmpty()) { - Iterator> iterator = consumerRecords.iterator(); - while (iterator.hasNext()) { - ConsumerRecord record = iterator.next(); + for (final ConsumerRecord record : consumerRecords) { executor.submit(() -> handlerMap.get(record.topic()).handle(record)); } if (!enableKafkaMessageAutoCommit) { diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java index f37f7440e2..0a5365f5d2 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java @@ -27,6 +27,12 @@ import org.apache.skywalking.oap.server.analyzer.provider.jvm.JVMSourceDispatche import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.config.NamingControl; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics.Timer; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; /** * A handler deserializes the message of JVM Metrics and pushes it to downstream. @@ -37,17 +43,42 @@ public class JVMMetricsHandler extends AbstractKafkaHandler { private final NamingControl namingLengthControl; private final JVMSourceDispatcher jvmSourceDispatcher; - public JVMMetricsHandler(ModuleManager moduleManager, KafkaFetcherConfig config) { - super(moduleManager, config); - this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager); - this.namingLengthControl = moduleManager.find(CoreModule.NAME) + private final HistogramMetrics histogram; + private final HistogramMetrics histogramBatch; + private final CounterMetrics errorCounter; + + public JVMMetricsHandler(ModuleManager manager, KafkaFetcherConfig config) { + super(manager, config); + this.jvmSourceDispatcher = new JVMSourceDispatcher(manager); + this.namingLengthControl = manager.find(CoreModule.NAME) .provider() .getService(NamingControl.class); + MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + histogram = metricsCreator.createHistogramMetric( + "meter_in_latency", + "The process latency of meter", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("kafka") + ); + histogramBatch = metricsCreator.createHistogramMetric( + "meter_in_latency", + "The process latency of meter", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("kafka") + ); + errorCounter = metricsCreator.createCounter( + "meter_analysis_error_count", + "The error number of meter analysis", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("kafka") + ); } @Override public void handle(final ConsumerRecord record) { - try { + try (Timer ignored = histogramBatch.createTimer()) { JVMMetricCollection metrics = JVMMetricCollection.parseFrom(record.value().get()); if (log.isDebugEnabled()) { @@ -62,7 +93,12 @@ public class JVMMetricsHandler extends AbstractKafkaHandler { builder.setServiceInstance(namingLengthControl.formatInstanceName(builder.getServiceInstance())); builder.getMetricsList().forEach(jvmMetric -> { - jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric); + try (Timer timer = histogram.createTimer()) { + jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric); + } catch (Exception e) { + errorCounter.inc(); + log.error(e.getMessage(), e); + } }); } catch (Exception e) { log.error("handle record failed", e); diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java index 83f2197726..d9a88e7103 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java @@ -42,8 +42,8 @@ public class JsonLogHandler extends LogHandler { } @Override - protected String getProtocolName() { - return "kafka-fetcher-native-json"; + protected String getDataFormat() { + return "json"; } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java index 3a9c07d040..6ec4af56eb 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java @@ -50,12 +50,16 @@ public class LogHandler implements KafkaHandler { .provider() .getService(MetricsCreator.class); histogram = metricsCreator.createHistogramMetric( - "log_in_latency", "The process latency of log", - new MetricsTag.Keys("protocol"), new MetricsTag.Values(getProtocolName()) + "log_in_latency", + "The process latency of log", + new MetricsTag.Keys("protocol", "data_format"), + new MetricsTag.Values("kafka", getDataFormat()) ); - errorCounter = metricsCreator.createCounter("log_analysis_error_count", "The error number of log analysis", - new MetricsTag.Keys("protocol"), - new MetricsTag.Values(getProtocolName()) + errorCounter = metricsCreator.createCounter( + "log_analysis_error_count", + "The error number of log analysis", + new MetricsTag.Keys("protocol", "data_format"), + new MetricsTag.Values("kafka", getDataFormat()) ); } @@ -71,20 +75,17 @@ public class LogHandler implements KafkaHandler { @Override public void handle(final ConsumerRecord record) { - HistogramMetrics.Timer timer = histogram.createTimer(); - try { + try (HistogramMetrics.Timer ignore = histogram.createTimer()) { LogData logData = parseConsumerRecord(record); logAnalyzerService.doAnalysis(logData); } catch (Exception e) { errorCounter.inc(); log.error(e.getMessage(), e); - } finally { - timer.finish(); } } - - protected String getProtocolName() { - return "kafka-fetcher-native-proto"; + + protected String getDataFormat() { + return "protobuf"; } protected LogData parseConsumerRecord(ConsumerRecord record) throws Exception { diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java index 29cc518ac8..aa36144b1a 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java @@ -38,29 +38,40 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; */ @Slf4j public class MeterServiceHandler extends AbstractKafkaHandler { - private IMeterProcessService processService; + private final IMeterProcessService processService; private final HistogramMetrics histogram; + private final HistogramMetrics histogramBatch; private final CounterMetrics errorCounter; public MeterServiceHandler(ModuleManager manager, KafkaFetcherConfig config) { super(manager, config); this.processService = manager.find(AnalyzerModule.NAME).provider().getService(IMeterProcessService.class); MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME) - .provider() - .getService(MetricsCreator.class); + .provider() + .getService(MetricsCreator.class); histogram = metricsCreator.createHistogramMetric( - "meter_in_latency", "The process latency of meter", - new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka-fetcher") + "meter_in_latency", + "The process latency of meter", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("kafka") ); - errorCounter = metricsCreator.createCounter("meter_analysis_error_count", "The error number of meter analysis", - new MetricsTag.Keys("protocol"), - new MetricsTag.Values("kafka-fetcher") + histogramBatch = metricsCreator.createHistogramMetric( + "meter_batch_in_latency", + "The process latency of meter", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("kafka") + ); + errorCounter = metricsCreator.createCounter( + "meter_analysis_error_count", + "The error number of meter analysis", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("kafka") ); } @Override public void handle(final ConsumerRecord record) { - try { + try (HistogramMetrics.Timer timer = histogramBatch.createTimer()) { MeterDataCollection meterDataCollection = MeterDataCollection.parseFrom(record.value().get()); MeterProcessor processor = processService.createProcessor(); meterDataCollection.getMeterDataList().forEach(meterData -> { @@ -72,7 +83,6 @@ public class MeterServiceHandler extends AbstractKafkaHandler { } }); processor.process(); - } catch (Exception e) { log.error("handle record failed", e); } diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java index e9ea1966e3..e188611167 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java @@ -27,20 +27,44 @@ import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics.Timer; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag.Keys; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag.Values; /** * A handler deserializes the message of profiling snapshot and pushes it to downstream. */ @Slf4j public class ProfileTaskHandler extends AbstractKafkaHandler { + private final HistogramMetrics histogram; + private final CounterMetrics errorCounter; public ProfileTaskHandler(ModuleManager manager, KafkaFetcherConfig config) { super(manager, config); + MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + histogram = metricsCreator.createHistogramMetric( + "profile_task_in_latency", + "The process latency of profile task", + new Keys("protocol"), + new Values("kafka") + ); + errorCounter = metricsCreator.createCounter( + "profile_task_analysis_error_count", + "The error number of profile task process", + new Keys("protocol"), + new Values("kafka") + ); } @Override public void handle(final ConsumerRecord record) { - try { + try (Timer ignored = histogram.createTimer()) { ThreadSnapshot snapshot = ThreadSnapshot.parseFrom(record.value().get()); if (log.isDebugEnabled()) { log.debug( @@ -60,6 +84,7 @@ public class ProfileTaskHandler extends AbstractKafkaHandler { RecordStreamProcessor.getInstance().in(snapshotRecord); } catch (Exception e) { + errorCounter.inc(); log.error("handle record failed", e); } } diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java index 3e491a48f4..03d8695107 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java @@ -78,11 +78,7 @@ public class ServiceManagementHandler extends AbstractKafkaHandler { serviceInstanceUpdate.setName(instanceName); if (log.isDebugEnabled()) { - log.debug( - "Service[{}] instance[{}] registered.", - serviceName, - instanceName - ); + log.debug("Service[{}] instance[{}] registered.", serviceName, instanceName); } JsonObject properties = new JsonObject(); @@ -107,11 +103,7 @@ public class ServiceManagementHandler extends AbstractKafkaHandler { final String instanceName = namingLengthControl.formatInstanceName(request.getServiceInstance()); if (log.isDebugEnabled()) { - log.debug( - "A ping of Service[{}] instance[{}].", - serviceName, - instanceName - ); + log.debug("A ping of Service[{}] instance[{}].", serviceName, instanceName); } ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate(); diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java index 8277a227bd..4b7189a3ed 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java @@ -41,8 +41,8 @@ public class TraceSegmentHandler extends AbstractKafkaHandler { private final ISegmentParserService segmentParserService; - private HistogramMetrics histogram; - private CounterMetrics errorCounter; + private final HistogramMetrics histogram; + private final CounterMetrics errorCounter; public TraceSegmentHandler(ModuleManager moduleManager, KafkaFetcherConfig config) { super(moduleManager, config); @@ -57,19 +57,19 @@ public class TraceSegmentHandler extends AbstractKafkaHandler { "trace_in_latency", "The process latency of trace data", new MetricsTag.Keys("protocol"), - new MetricsTag.Values("kafka-fetcher") + new MetricsTag.Values("kafka") ); errorCounter = metricsCreator.createCounter( "trace_analysis_error_count", "The error number of trace analysis", new MetricsTag.Keys("protocol"), - new MetricsTag.Values("kafka-fetcher") + new MetricsTag.Values("kafka") ); } @Override public void handle(final ConsumerRecord record) { - try { + try (HistogramMetrics.Timer ignore = histogram.createTimer()) { SegmentObject segment = SegmentObject.parseFrom(record.value().get()); if (log.isDebugEnabled()) { log.debug( @@ -78,17 +78,9 @@ public class TraceSegmentHandler extends AbstractKafkaHandler { segment.getServiceInstance() ); } - - HistogramMetrics.Timer timer = histogram.createTimer(); - try { - segmentParserService.send(segment); - } catch (Exception e) { - errorCounter.inc(); - log.error(e.getMessage(), e); - } finally { - timer.finish(); - } + segmentParserService.send(segment); } catch (InvalidProtocolBufferException e) { + errorCounter.inc(); log.error("handle record failed", e); } } diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandlerTest.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandlerTest.java index a9ff2f0465..8ac4824f25 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandlerTest.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandlerTest.java @@ -41,6 +41,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig; import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager; import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleProvider; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; @@ -82,6 +85,12 @@ public class JVMMetricsHandlerTest { registerServiceImplementation(SourceReceiver.class, SOURCE_RECEIVER); } }); + register(TelemetryModule.NAME, () -> new MockModuleProvider() { + @Override + protected void register() { + registerServiceImplementation(MetricsCreator.class, new MetricsCreatorNoop()); + } + }); } }; handler = new JVMMetricsHandler(manager, config); -- GitLab