提交 997d8f62 编写于 作者: M Matteo Merli 提交者: GitHub

Synchronized producer cleanup during close operation (#34)

Fixes #33: Intermittent test failure on BacklogQuotaManagerTest.testAheadProducerOnHoldTimeout
上级 7d317bd3
......@@ -371,9 +371,13 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
if (!isConnected()) {
log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName);
state.set(State.Closed);
client.cleanupProducer(this);
pendingMessages.forEach(msg -> msg.cmd.release());
synchronized (this) {
state.set(State.Closed);
client.cleanupProducer(this);
pendingMessages.forEach(msg -> msg.cmd.release());
pendingMessages.clear();
}
return CompletableFuture.completedFuture(null);
}
......@@ -398,9 +402,13 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
if (exception == null || !cnx.ctx().channel().isActive()) {
// Either we've received the success response for the close producer command from the broker, or the
// connection did break in the meantime. In any case, the producer is gone.
log.info("[{}] [{}] Closed Producer", topic, producerName);
state.set(State.Closed);
pendingMessages.forEach(msg -> msg.cmd.release());
synchronized (ProducerImpl.this) {
log.info("[{}] [{}] Closed Producer", topic, producerName);
state.set(State.Closed);
pendingMessages.forEach(msg -> msg.cmd.release());
pendingMessages.clear();
}
closeFuture.complete(null);
client.cleanupProducer(this);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册