未验证 提交 f6a0efde 编写于 作者: D Darcy 提交者: GitHub

Fix bug for Kafka report plugin (#7016)

上级 e9b9be4a
......@@ -120,6 +120,7 @@ public class KafkaProducerManager implements BootService, Runnable {
.collect(Collectors.toSet());
if (!topics.isEmpty()) {
LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
closeAdminClient(adminClient);
return;
}
......@@ -127,6 +128,7 @@ public class KafkaProducerManager implements BootService, Runnable {
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
} catch (Exception e) {
LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
closeAdminClient(adminClient);
return;
}
//notify listeners to send data if no exception been throw
......@@ -140,6 +142,16 @@ public class KafkaProducerManager implements BootService, Runnable {
}
}
private void closeAdminClient(AdminClient adminClient) {
if (adminClient != null) {
try {
adminClient.close();
} catch (Exception e) {
LOGGER.error("close kafka admin client failed", e);
}
}
}
/**
* Get the KafkaProducer instance to send data to Kafka broker.
*/
......@@ -162,4 +174,4 @@ public class KafkaProducerManager implements BootService, Runnable {
producer.flush();
producer.close();
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册