diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 1fac0efdf32f8db10ecbd1c9d5eee3a1c730bbd0..6dc48e5566d22e4be404660ef46856974e62f917 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -301,14 +301,16 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } - Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); - - if (!messagesToReplayNow.isEmpty()) { - if (havePendingReplayRead) { + if (havePendingReplayRead) { + if (log.isDebugEnabled()) { log.debug("[{}] Skipping replay while awaiting previous read to complete", name); - return; } + return; + } + + Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); + if (!messagesToReplayNow.isEmpty()) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), consumerList.size()); @@ -454,7 +456,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } protected void sendMessagesToConsumers(ReadType readType, List entries) { - if (entries == null || entries.size() == 0) { return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index 6dab836c2b033440d08cbcb134901db22dd6cd24..3cd2e255e792052060a86d5a2558d35f45adbdf5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -208,4 +208,67 @@ public class DelayedDeliveryTest extends ProducerConsumerBase { assertTrue(receivedMsgs.contains("msg-" + i)); } } + + @Test + public void testDelayedDeliveryWithMultipleConcurrentReadEntries() + throws Exception { + String topic = "persistent://public/default/testDelayedDelivery-" + System.nanoTime(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("shared-sub") + .subscriptionType(SubscriptionType.Shared) + .receiverQueueSize(1) // Use small prefecthing to simulate the multiple read batches + .subscribe(); + + // Simulate race condition with high frequency of calls to dispatcher.readMoreEntries() + PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) ((PersistentTopic) pulsar + .getBrokerService().getTopicReference(topic).get()).getSubscription("shared-sub").getDispatcher(); + Thread t = new Thread(() -> { + while (true) { + synchronized (d) { + d.readMoreEntries(); + } + + try { + Thread.sleep(1); + } catch (InterruptedException e) { + return; + } + } + }); + t.start(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + final int N = 1000; + + for (int i = 0; i < N; i++) { + producer.newMessage() + .value("msg-" + i) + .deliverAfter(5, TimeUnit.SECONDS) + .sendAsync(); + } + + producer.flush(); + + Message msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertNull(msg); + + Set receivedMsgs = new TreeSet<>(); + for (int i = 0; i < N; i++) { + msg = consumer.receive(10, TimeUnit.SECONDS); + receivedMsgs.add(msg.getValue()); + } + + assertEquals(receivedMsgs.size(), N); + for (int i = 0; i < N; i++) { + assertTrue(receivedMsgs.contains("msg-" + i)); + } + t.interrupt(); + } }