diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 03a68ba98aeab16ffa635109a377baea42640f6a..ae0426935637c580bc837534ab5c7c6fc48974a1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -20,9 +20,12 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.Beta; import com.google.common.base.Predicate; +import com.google.common.collect.Range; + import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; @@ -30,6 +33,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; +import org.apache.bookkeeper.mledger.impl.PositionImpl; /** * A ManangedCursor is a persisted cursor inside a ManagedLedger. @@ -594,4 +598,14 @@ public interface ManagedCursor { */ ManagedLedger getManagedLedger(); + /** + * Get last individual deleted range + * @return range + */ + Range getLastIndividualDeletedRange(); + + /** + * Trim delete entries for the given entries + */ + void trimDeletedEntries(List entries); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8a6f9a2c1479030e463214c2c8acd56c4dc2e9b5..cc425aee206df6d22db84eea0bf29c8c556c6ca0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1050,7 +1050,7 @@ public class ManagedCursorImpl implements ManagedCursor { positions.stream() .filter(position -> individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId()) - || ((PositionImpl) position).compareTo(markDeletePosition) < 0) + || ((PositionImpl) position).compareTo(markDeletePosition) <= 0) .forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); @@ -2590,5 +2590,16 @@ public class ManagedCursorImpl implements ManagedCursor { return this.ledger; } + @Override + public Range getLastIndividualDeletedRange() { + return individualDeletedMessages.lastRange(); + } + + @Override + public void trimDeletedEntries(List entries) { + entries.removeIf(entry -> ((PositionImpl) entry.getPosition()).compareTo(markDeletePosition) <= 0 + || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId())); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index c415320ddf5a424c214041ee2dca00a7672e46c4..21f5747919b2f50cba04ef01a848df11cf9821ec 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -26,6 +26,7 @@ import static org.testng.Assert.fail; import com.google.common.base.Predicate; import com.google.common.collect.Lists; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import java.util.Collections; import java.util.List; @@ -315,6 +316,16 @@ public class ManagedCursorContainerTest { return null; } + @Override + public Range getLastIndividualDeletedRange() { + return null; + } + + @Override + public void trimDeletedEntries(List entries) { + + } + } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 70db43c9ff4e0aaea72a6019149955d31e95ec60..b0db9264d8f91031680bf9f26913d0e0e9b241d5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -32,6 +32,7 @@ import static org.testng.Assert.fail; import com.google.common.base.Charsets; import com.google.common.collect.Lists; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.nio.charset.Charset; @@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; + +import io.netty.buffer.ByteBufAllocator; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -2225,6 +2228,62 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { } } + @Test(timeOut = 20000) + void testGetLastIndividualDeletedRange() throws Exception { + ManagedLedger ledger = factory.open("test_last_individual_deleted"); + + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); + PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition(); + for(int i = 0; i < 10; i++) { + ledger.addEntry(("entry" + i).getBytes(Encoding)); + } + PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1); + PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2); + PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5); + PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6); + + c1.delete(Lists.newArrayList(p1, p2, p3, p4)); + + assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p3.getLedgerId(), + p3.getEntryId() - 1), p4)); + + PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 8); + c1.delete(p5); + + assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p5.getLedgerId(), + p5.getEntryId() - 1), p5)); + + } + + @Test(timeOut = 20000) + void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedException { + ManagedLedger ledger = factory.open("my_test_ledger"); + + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); + PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition(); + for(int i = 0; i < 10; i++) { + ledger.addEntry(("entry" + i).getBytes(Encoding)); + } + PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1); + PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2); + PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5); + PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6); + + c1.delete(Lists.newArrayList(p1, p2, p3, p4)); + + EntryImpl entry1 = EntryImpl.create(p1, ByteBufAllocator.DEFAULT.buffer(0)); + EntryImpl entry2 = EntryImpl.create(p2, ByteBufAllocator.DEFAULT.buffer(0)); + EntryImpl entry3 = EntryImpl.create(p3, ByteBufAllocator.DEFAULT.buffer(0)); + EntryImpl entry4 = EntryImpl.create(p4, ByteBufAllocator.DEFAULT.buffer(0)); + EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 7, + ByteBufAllocator.DEFAULT.buffer(0)); + List entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5); + c1.trimDeletedEntries(entries); + assertEquals(entries.size(), 1); + assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId() , + markDeletedPosition.getEntryId() + 7)); + } + @Test(timeOut = 20000) void outOfOrderAcks() throws Exception { ManagedLedger ledger = factory.open("outOfOrderAcks"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index cda9c0986eaa68ab0a2596a93e000c3e4afb5167..5e6e72e5096da143ac2e51e5f94fb4ef45a54b16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -94,4 +94,8 @@ public interface Dispatcher { default long getNumberOfDelayedMessages() { return 0; } + + default void cursorIsReset() { + //No-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 4dff4c49dc11e1d51d26890945653428f73b2ac0..91e4ab7dc2841d52eb0929f3eeb186147a6b3e0e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -32,6 +32,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import com.google.common.collect.Range; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -72,6 +73,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected final PersistentTopic topic; protected final ManagedCursor cursor; + protected volatile Range lastIndividualDeletedRangeFromCursorRecovery; private CompletableFuture closeFuture = null; LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); @@ -106,6 +108,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul super(subscription); this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.cursor = cursor; + this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange(); this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() @@ -431,6 +434,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } protected void sendMessagesToConsumers(ReadType readType, List entries) { + + if (entries == null || entries.size() == 0) { + return; + } + if (needTrimAckedMessages()) { + cursor.trimDeletedEntries(entries); + } int start = 0; int entriesToDispatch = entries.size(); long totalMessagesSent = 0; @@ -558,6 +568,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } + private boolean needTrimAckedMessages() { + if (lastIndividualDeletedRangeFromCursorRecovery == null) { + return false; + } else { + return lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint() + .compareTo((PositionImpl) cursor.getReadPosition()) > 0; + } + } /** * returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits @@ -726,6 +744,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } } + @Override + public void cursorIsReset() { + if (this.lastIndividualDeletedRangeFromCursorRecovery != null) { + this.lastIndividualDeletedRangeFromCursorRecovery = null; + } + } + public PersistentTopic getTopic() { return topic; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 20d64edf1ad0e6cf586a3d444c4b1c83f3b3fdd2..4a4e3aa4846dce1f78dea4796f6af1368602d398 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -651,6 +651,9 @@ public class PersistentSubscription implements Subscription { log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName, finalPosition); } + if (dispatcher != null) { + dispatcher.cursorIsReset(); + } IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); future.complete(null); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java index 3af8fb7953b51f524de565367f2b91588a19fe36..0d141197679f3c32b8ef920d7dea4510c0cea189 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java @@ -222,12 +222,26 @@ public class ConcurrentOpenLongPairRangeSet> implements @Override public Range firstRange() { + if (rangeBitSetMap.isEmpty()) { + return null; + } Entry firstSet = rangeBitSetMap.firstEntry(); int lower = firstSet.getValue().nextSetBit(0); int upper = Math.max(lower, firstSet.getValue().nextClearBit(lower) - 1); return Range.openClosed(consumer.apply(firstSet.getKey(), lower - 1), consumer.apply(firstSet.getKey(), upper)); } + @Override + public Range lastRange() { + if (rangeBitSetMap.isEmpty()) { + return null; + } + Entry lastSet = rangeBitSetMap.lastEntry(); + int upper = lastSet.getValue().previousSetBit(lastSet.getValue().size()); + int lower = Math.min(lastSet.getValue().previousClearBit(upper), upper); + return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper)); + } + @Override public int size() { if (updatedAfterCachedForSize) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java index 2187ffe0814deaeb6bcef6724732d168832387d2..0d19635fe360acfd3a65b5f466108d9117fb3461 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java @@ -19,10 +19,13 @@ package org.apache.pulsar.common.util.collections; import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeSet; + import java.util.Collection; +import java.util.List; import java.util.Set; /** @@ -114,6 +117,13 @@ public interface LongPairRangeSet> { */ Range firstRange(); + /** + * It returns very last biggest range in the rangeSet. + * + * @return last biggest range into the set + */ + Range lastRange(); + /** * Represents a function that accepts two long arguments and produces a result. * @@ -259,6 +269,15 @@ public interface LongPairRangeSet> { return set.asRanges().iterator().next(); } + @Override + public Range lastRange() { + if (set.asRanges().isEmpty()) { + return null; + } + List> list = Lists.newArrayList(set.asRanges().iterator()); + return list.get(list.size() - 1); + } + @Override public int size() { return set.asRanges().size(); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java index 210b7e63f2b4414e7e7a2f86a01dd1c4a4142b56..4adf14ff7faee5ff44293be76d58ff891601ea7f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java @@ -197,6 +197,7 @@ public class ConcurrentOpenLongPairRangeSetTest { @Test public void testFirstRange() { ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + assertNull(set.firstRange()); Range range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)); set.add(range); assertEquals(set.firstRange(), range); @@ -211,6 +212,28 @@ public class ConcurrentOpenLongPairRangeSetTest { assertEquals(set.size(), 2); } + @Test + public void testLastRange() { + ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + assertNull(set.lastRange()); + Range range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)); + set.add(range); + assertEquals(set.lastRange(), range); + assertEquals(set.size(), 1); + range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105)); + set.add(range); + assertEquals(set.lastRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 105))); + assertEquals(set.size(), 1); + range = Range.openClosed(new LongPair(1, 5), new LongPair(1, 75)); + set.add(range); + assertEquals(set.lastRange(), range); + assertEquals(set.size(), 2); + range = Range.openClosed(new LongPair(1, 80), new LongPair(1, 120)); + set.add(range); + assertEquals(set.lastRange(), range); + assertEquals(set.size(), 3); + } + @Test public void testToString() { ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer);