From d126b1d220f3a9878c637baece08699b7e7282c9 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Wed, 23 Oct 2019 04:06:26 -0700 Subject: [PATCH] [pulsar-broker] Fix: race condition : Failed to read-more entries on dispatcher (#5391) * [pulsar-broker] Fix: race condition : Failed to read-more entries on dispatcher * clean up non-used method (cherry picked from commit 13ea25a3e0b1d4b01ec678049964816a1275a95f) --- ...PersistentDispatcherMultipleConsumers.java | 28 +++++-------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index bc5abf4d10a..5102419d2b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; @@ -296,14 +297,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } - if (hasMessagesToReplay()) { + Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); + + if (!messagesToReplayNow.isEmpty()) { if (havePendingReplayRead) { log.debug("[{}] Skipping replay while awaiting previous read to complete", name); return; } - Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); - if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), consumerList.size()); @@ -710,29 +711,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); } - /** - * Returns whether we have any message that could be immediately replayed. - * This could be a message that was requested to be re-delivered or a delayed - * delivery. - */ - private boolean hasMessagesToReplay() { - if (!messagesToRedeliver.isEmpty()) { - return true; - } - - if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { - return true; - } - - return false; - } - private synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (!messagesToRedeliver.isEmpty()) { return messagesToRedeliver.items(maxMessagesToRead, (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); - } else { + } else if (delayedDeliveryTracker.isPresent()) { return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); + } else { + return Collections.emptySet(); } } -- GitLab