diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java index f9bccc56ecf20cce1d3d8baeed297fca358ec95c..244a1610c5e9cf8bd3e597ac8023122f045dd511 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java @@ -120,6 +120,7 @@ public class KafkaProducerManager implements BootService, Runnable { .collect(Collectors.toSet()); if (!topics.isEmpty()) { LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics); + closeAdminClient(adminClient); return; } @@ -127,6 +128,7 @@ public class KafkaProducerManager implements BootService, Runnable { producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer()); } catch (Exception e) { LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS); + closeAdminClient(adminClient); return; } //notify listeners to send data if no exception been throw @@ -140,6 +142,16 @@ public class KafkaProducerManager implements BootService, Runnable { } } + private void closeAdminClient(AdminClient adminClient) { + if (adminClient != null) { + try { + adminClient.close(); + } catch (Exception e) { + LOGGER.error("close kafka admin client failed", e); + } + } + } + /** * Get the KafkaProducer instance to send data to Kafka broker. */ @@ -162,4 +174,4 @@ public class KafkaProducerManager implements BootService, Runnable { producer.flush(); producer.close(); } -} +} \ No newline at end of file