diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java index d46d3b38bf76292f65e45b035e4a3ce19a2bc40d..e34d1a1b4d33eb097607748e9ce60eb4fee8e05a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java @@ -34,6 +34,11 @@ class BatchMessageAcker { return new BatchMessageAcker(bitSet, batchSize); } + // Use the param bitSet as the BatchMessageAcker's bitSet, don't care about the batchSize. + static BatchMessageAcker newAcker(BitSet bitSet) { + return new BatchMessageAcker(bitSet, -1); + } + // bitset shared across messages in the same batch. private final int batchSize; private final BitSet bitSet; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b1df6f8c82cf4764c0bf4038143459e03b5147de..cfaaa89e76ffce629d76e5adf238a426781d9f98 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1356,16 +1356,21 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle // create ack tracker for entry aka batch MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); - BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize); List> possibleToDeadLetter = null; if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { possibleToDeadLetter = new ArrayList<>(); } - int skippedMessages = 0; + + BatchMessageAcker acker; BitSetRecyclable ackBitSet = null; if (ackSet != null && ackSet.size() > 0) { ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)); + acker = BatchMessageAcker.newAcker(BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet))); + } else { + acker = BatchMessageAcker.newAcker(batchSize); } + + int skippedMessages = 0; try { int startBatchIndex = Math.max(messageId.getBatchIndex(), 0); for (int i = startBatchIndex; i < batchSize; ++i) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 6a4deef46cc66a0908a93c69096799f1c4f554a9..fd61c42ba6134ba4e9086c5ebd70aed98e6f1e99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -168,8 +168,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } else if (ackType == AckType.Individual) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> { - ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchSize); + ConcurrentBitSetRecyclable value; + if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) { + value = ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet()); + } else { + value = ConcurrentBitSetRecyclable.create(); + value.set(0, batchSize); + } return value; }); bitSet.clear(batchIndex); @@ -221,8 +226,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments if (cnx == null) { return false; } - BitSetRecyclable bitSet = BitSetRecyclable.create(); - bitSet.set(0, batchSize); + BitSetRecyclable bitSet; + if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) { + bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray()); + } else { + bitSet = BitSetRecyclable.create(); + bitSet.set(0, batchSize); + } if (ackType == AckType.Cumulative) { bitSet.clear(0, batchIndex + 1); } else { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java index 2bfa620d43ad9c1f86fb6b8bd464ddb38f74abc2..8c1565eb2cb2cdcc577aef0815d309255102d538 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java @@ -22,9 +22,12 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.BitSet; + public class BatchMessageAckerTest { private static final int BATCH_SIZE = 10; @@ -68,4 +71,13 @@ public class BatchMessageAckerTest { assertEquals(0, acker.getOutstandingAcks()); } + @Test + public void testBitSetAcker() { + BitSet bitSet = BitSet.valueOf(acker.getBitSet().toLongArray()); + BatchMessageAcker bitSetAcker = BatchMessageAcker.newAcker(bitSet); + + Assert.assertEquals(acker.getBitSet(), bitSetAcker.getBitSet()); + Assert.assertEquals(acker.getOutstandingAcks(), bitSetAcker.getOutstandingAcks()); + } + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java index 8e787c1ea6ebb33f611cd371c81e0d2d7f2bb817..21ee42bb9c04378279cee3534f6d06a580f3b1b1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java @@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; +import java.util.BitSet; /** * Safe multithreaded version of {@code BitSet} and leverage netty recycler. @@ -43,6 +44,12 @@ public class ConcurrentBitSetRecyclable extends ConcurrentBitSet { return RECYCLER.get(); } + public static ConcurrentBitSetRecyclable create(BitSet bitSet) { + ConcurrentBitSetRecyclable recyclable = RECYCLER.get(); + recyclable.or(bitSet); + return recyclable; + } + public void recycle() { this.clear(); recyclerHandle.recycle(this); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java index b037c705fae1ee43da0d341765789edb16386925..e9371767f2a3392aff5fe555721eb9ffd0c3cd5e 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.util.collections; +import java.util.BitSet; import org.testng.Assert; import org.testng.annotations.Test; @@ -34,4 +35,26 @@ public class ConcurrentBitSetRecyclableTest { Assert.assertFalse(bitset2.get(3)); Assert.assertNotSame(bitset3, bitset1); } + + @Test + public void testGenerateByBitSet() { + BitSet bitSet = new BitSet(); + ConcurrentBitSetRecyclable bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.set(0, 10); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.clear(5); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.clear(); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + } }