未验证 提交 81202e14 编写于 作者: R ran 提交者: GitHub

support use `BitSet` generate the `BatchMessageAcker` (#7909)

Motivation
Currently, we have to know the batchSize to generate BatchMessageAcker. If we could get the batch index ack bitSet from Broker we could generate the BatchMessageAcker by the bitSet, this is useful for consuming transaction messages, we don't need to change the protocol to get the total message number of one transaction.

Modifications
Add a new static method to generate the BatchMessageAcker by BitSet.
上级 af3a7afd
......@@ -34,6 +34,11 @@ class BatchMessageAcker {
return new BatchMessageAcker(bitSet, batchSize);
}
// Use the param bitSet as the BatchMessageAcker's bitSet, don't care about the batchSize.
static BatchMessageAcker newAcker(BitSet bitSet) {
return new BatchMessageAcker(bitSet, -1);
}
// bitset shared across messages in the same batch.
private final int batchSize;
private final BitSet bitSet;
......
......@@ -1356,16 +1356,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// create ack tracker for entry aka batch
MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
getPartitionIndex());
BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
List<MessageImpl<T>> possibleToDeadLetter = null;
if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleToDeadLetter = new ArrayList<>();
}
int skippedMessages = 0;
BatchMessageAcker acker;
BitSetRecyclable ackBitSet = null;
if (ackSet != null && ackSet.size() > 0) {
ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
acker = BatchMessageAcker.newAcker(BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)));
} else {
acker = BatchMessageAcker.newAcker(batchSize);
}
int skippedMessages = 0;
try {
int startBatchIndex = Math.max(messageId.getBatchIndex(), 0);
for (int i = startBatchIndex; i < batchSize; ++i) {
......
......@@ -168,8 +168,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
} else if (ackType == AckType.Individual) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> {
ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create();
value.set(0, batchSize);
ConcurrentBitSetRecyclable value;
if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
value = ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, batchSize);
}
return value;
});
bitSet.clear(batchIndex);
......@@ -221,8 +226,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
if (cnx == null) {
return false;
}
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
BitSetRecyclable bitSet;
if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray());
} else {
bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
}
if (ackType == AckType.Cumulative) {
bitSet.clear(0, batchIndex + 1);
} else {
......
......@@ -22,9 +22,12 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.BitSet;
public class BatchMessageAckerTest {
private static final int BATCH_SIZE = 10;
......@@ -68,4 +71,13 @@ public class BatchMessageAckerTest {
assertEquals(0, acker.getOutstandingAcks());
}
@Test
public void testBitSetAcker() {
BitSet bitSet = BitSet.valueOf(acker.getBitSet().toLongArray());
BatchMessageAcker bitSetAcker = BatchMessageAcker.newAcker(bitSet);
Assert.assertEquals(acker.getBitSet(), bitSetAcker.getBitSet());
Assert.assertEquals(acker.getOutstandingAcks(), bitSetAcker.getOutstandingAcks());
}
}
......@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.BitSet;
/**
* Safe multithreaded version of {@code BitSet} and leverage netty recycler.
......@@ -43,6 +44,12 @@ public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
return RECYCLER.get();
}
public static ConcurrentBitSetRecyclable create(BitSet bitSet) {
ConcurrentBitSetRecyclable recyclable = RECYCLER.get();
recyclable.or(bitSet);
return recyclable;
}
public void recycle() {
this.clear();
recyclerHandle.recycle(this);
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.util.collections;
import java.util.BitSet;
import org.testng.Assert;
import org.testng.annotations.Test;
......@@ -34,4 +35,26 @@ public class ConcurrentBitSetRecyclableTest {
Assert.assertFalse(bitset2.get(3));
Assert.assertNotSame(bitset3, bitset1);
}
@Test
public void testGenerateByBitSet() {
BitSet bitSet = new BitSet();
ConcurrentBitSetRecyclable bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
Assert.assertEquals(bitSet, bitSetRecyclable);
bitSet.set(0, 10);
bitSetRecyclable.recycle();
bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
Assert.assertEquals(bitSet, bitSetRecyclable);
bitSet.clear(5);
bitSetRecyclable.recycle();
bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
Assert.assertEquals(bitSet, bitSetRecyclable);
bitSet.clear();
bitSetRecyclable.recycle();
bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
Assert.assertEquals(bitSet, bitSetRecyclable);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册