From 467ffab15c862b240d805cf1b916777ba633392c Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 29 Oct 2019 19:16:06 -0700 Subject: [PATCH] Efficiency improvements for delay delivery tracker (#5498) Efficiency improvements for delay delivery tracker --- .../InMemoryDelayedDeliveryTracker.java | 12 +++++--- ...PersistentDispatcherMultipleConsumers.java | 24 ++++++++-------- .../delayed/InMemoryDeliveryTrackerTest.java | 28 +++++++++++++++++++ 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index c692c3ac77a..447928486ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -71,8 +71,12 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId, deliveryAt - now); } - if (deliveryAt < now) { - // It's already about time to deliver this message + if (deliveryAt < (now + tickTimeMillis)) { + // It's already about time to deliver this message. We add the buffer of + // `tickTimeMillis` because messages can be extracted from the tracker + // slightly before the expiration time. We don't want the messages to + // go back into the delay tracker (for a brief amount of time) when we're + // trying to dispatch to the consumer. return false; } @@ -117,7 +121,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T } if (log.isDebugEnabled()) { - log.debug("[{}] Get scheduled messags - found {}", dispatcher.getName(), positions.size()); + log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size()); } updateTimer(); return positions; @@ -170,7 +174,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T @Override public void run(Timeout timeout) throws Exception { if (log.isDebugEnabled()) { - log.info("[{}] Timer triggered", dispatcher.getName()); + log.debug("[{}] Timer triggered", dispatcher.getName()); } if (timeout.isCancelled()) { return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 3da7feb891a..1fac0efdf32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import com.google.common.collect.Range; + import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -715,25 +716,28 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } @Override - public synchronized boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { + public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { if (!isDelayedDeliveryEnabled) { // If broker has the feature disabled, always deliver messages immediately return false; } - if (!delayedDeliveryTracker.isPresent()) { - // Initialize the tracker the first time we need to use it - delayedDeliveryTracker = Optional.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); - } + synchronized (this) { + if (!delayedDeliveryTracker.isPresent()) { + // Initialize the tracker the first time we need to use it + delayedDeliveryTracker = Optional + .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); + } - return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); + return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); + } } private synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (!messagesToRedeliver.isEmpty()) { return messagesToRedeliver.items(maxMessagesToRead, (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); - } else if (delayedDeliveryTracker.isPresent()) { + } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); } else { return Collections.emptySet(); @@ -741,11 +745,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } public synchronized long getNumberOfDelayedMessages() { - if (delayedDeliveryTracker.isPresent()) { - return delayedDeliveryTracker.get().getNumberOfDelayedMessages(); - } else { - return 0; - } + return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index b6f958d7da2..ddb3ba63bf9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -156,4 +156,32 @@ public class InMemoryDeliveryTrackerTest { task.run(mock(Timeout.class)); verify(dispatcher).readMoreEntries(); } + + /** + * Adding a message that is about to expire within the tick time should lead + * to a rejection from the tracker. + */ + @Test + public void testAddWithinTickTime() throws Exception { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + Timer timer = mock(Timer.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock); + + clockTime.set(0); + + assertFalse(tracker.addMessage(1, 1, 10)); + assertFalse(tracker.addMessage(2, 2, 99)); + assertTrue(tracker.addMessage(3, 3, 100)); + assertTrue(tracker.addMessage(4, 4, 200)); + + assertEquals(tracker.getNumberOfDelayedMessages(), 2); + } + } -- GitLab