提交 13ea25a3 编写于 作者: R Rajan Dhabalia 提交者: Ali Ahmed

[pulsar-broker] Fix: race condition : Failed to read-more entries on dispatcher (#5391)

* [pulsar-broker] Fix: race condition : Failed to read-more entries on dispatcher

* clean up non-used method
上级 81e79500
......@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
......@@ -299,14 +300,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
if (hasMessagesToReplay()) {
Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
if (!messagesToReplayNow.isEmpty()) {
if (havePendingReplayRead) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
return;
}
Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
consumerList.size());
......@@ -728,29 +729,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
}
/**
* Returns whether we have any message that could be immediately replayed.
* This could be a message that was requested to be re-delivered or a delayed
* delivery.
*/
private boolean hasMessagesToReplay() {
if (!messagesToRedeliver.isEmpty()) {
return true;
}
if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
return true;
}
return false;
}
private synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (!messagesToRedeliver.isEmpty()) {
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
} else {
} else if (delayedDeliveryTracker.isPresent()) {
return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
} else {
return Collections.emptySet();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册