diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index bc5abf4d10a05c88fc18482444d3e82f645f133b..5102419d2b4f5d5bef6a48207f21d1401df1ae99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; @@ -296,14 +297,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } - if (hasMessagesToReplay()) { + Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); + + if (!messagesToReplayNow.isEmpty()) { if (havePendingReplayRead) { log.debug("[{}] Skipping replay while awaiting previous read to complete", name); return; } - Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); - if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), consumerList.size()); @@ -710,29 +711,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); } - /** - * Returns whether we have any message that could be immediately replayed. - * This could be a message that was requested to be re-delivered or a delayed - * delivery. - */ - private boolean hasMessagesToReplay() { - if (!messagesToRedeliver.isEmpty()) { - return true; - } - - if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { - return true; - } - - return false; - } - private synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (!messagesToRedeliver.isEmpty()) { return messagesToRedeliver.items(maxMessagesToRead, (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); - } else { + } else if (delayedDeliveryTracker.isPresent()) { return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); + } else { + return Collections.emptySet(); } }