未验证 提交 3ac98d8d 编写于 作者: R ran 提交者: GitHub

[Transaction] Support transaction abort on partition (#7953)

Fixes https://github.com/streamnative/pulsar/issues/1312

# Motivation

Currently, the transaction abort on partitions operation is not getting through.

### Modifications

Make the transaction abort on partitions operation get through.
上级 6a2b5745
......@@ -194,7 +194,7 @@ public class TransactionMetadataStoreService {
}
completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
.thenCompose(ignored -> endToTB(txnID, newStatus));
.thenCompose(ignored -> endToTB(txnID, txnAction));
if (TxnStatus.COMMITTING.equals(newStatus)) {
completableFuture = completableFuture
.thenCompose(ignored -> updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING));
......@@ -205,7 +205,7 @@ public class TransactionMetadataStoreService {
return completableFuture;
}
private CompletableFuture<Void> endToTB(TxnID txnID, TxnStatus newStatus) {
private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction) {
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
......@@ -214,16 +214,15 @@ public class TransactionMetadataStoreService {
return;
}
txnMeta.producedPartitions().forEach(partition -> {
CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
if (TxnStatus.COMMITTING.equals(newStatus)) {
commitFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
} else if (TxnStatus.ABORTING.equals(newStatus)) {
commitFuture.completeExceptionally(new Throwable("Unsupported operation."));
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
} else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
} else {
// Unsupported txnStatus
commitFuture.completeExceptionally(new Throwable("Unsupported txnStatus."));
actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
}
commitFutureList.add(commitFuture);
commitFutureList.add(actionFuture);
});
try {
FutureUtil.waitForAll(commitFutureList).whenComplete((ignored, waitThrowable) -> {
......
......@@ -1717,7 +1717,7 @@ public class ServerCnx extends PulsarHandler {
final int txnAction = command.getTxnAction().getNumber();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
service.getTopics().get(command.getTopic()).whenComplete((topic, t) -> {
service.getTopics().get(TopicName.get(command.getTopic()).toString()).whenComplete((topic, t) -> {
if (!topic.isPresent()) {
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
command.getRequestId(), ServerError.TopicNotFound,
......
......@@ -152,20 +152,15 @@ public class PersistentTransactionBuffer extends PersistentTopic implements Tran
@Override
public CompletableFuture<Void> endTxnOnPartition(TxnID txnID, int txnAction) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
committingTxn(txnID).whenComplete((ignored, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
return;
}
completableFuture.complete(null);
});
future = committingTxn(txnID);
} else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
// TODO handle abort operation
completableFuture.complete(null);
future = abortTxn(txnID);
} else {
future.completeExceptionally(new Exception("Unsupported txnAction " + txnAction));
}
return completableFuture;
return future;
}
private CompletableFuture<Void> committingTxn(TxnID txnID) {
......
......@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
......@@ -115,7 +116,6 @@ public class TransactionProduceTest extends TransactionTestBase {
.topic(TOPIC_OUTPUT)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.roundRobinRouterBatchingPartitionSwitchFrequency(1)
.create();
int messageCntPerPartition = 3;
......@@ -140,35 +140,12 @@ public class TransactionProduceTest extends TransactionTestBase {
}
// the messageId callback can't be called before commit
futureList.forEach(messageIdFuture -> {
try {
messageIdFuture.get(1, TimeUnit.SECONDS);
Assert.fail("MessageId shouldn't be get before txn commit.");
} catch (Exception e) {
if (e instanceof TimeoutException) {
log.info("This is a expected exception.");
} else {
log.error("This exception is not expected.", e);
Assert.fail("This exception is not expected.");
}
}
});
checkMessageId(futureList, false);
tnx.commit().get();
Thread.sleep(3000L);
// the messageId callback should be called after commit
futureList.forEach(messageIdFuture -> {
try {
MessageId messageId = messageIdFuture.get(1, TimeUnit.SECONDS);
Assert.assertNotNull(messageId);
log.info("Tnx commit success! messageId: {}", messageId);
} catch (Exception e) {
log.error("Tnx commit failed! tnx: " + tnx, e);
Assert.fail("Tnx commit failed! tnx: " + tnx);
}
});
checkMessageId(futureList, true);
for (int i = 0; i < TOPIC_PARTITION; i++) {
// the target topic partition received the commit marker
......@@ -220,13 +197,116 @@ public class TransactionProduceTest extends TransactionTestBase {
System.out.println("finish test");
}
@Test
public void produceAndAbortTest() throws Exception {
String topic = NAMESPACE1 + "/produce-abort-test";
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
TransactionImpl txn = (TransactionImpl) pulsarClientImpl.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
@Cleanup
ProducerImpl<byte[]> outProducer = (ProducerImpl<byte[]>) pulsarClientImpl
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();
int messageCnt = 10;
Set<String> messageSet = new HashSet<>();
List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
for (int i = 0; i < messageCnt; i++) {
String msg = "Hello Txn - " + i;
messageSet.add(msg);
CompletableFuture<MessageId> produceFuture = outProducer
.newMessage(txn).value(msg.getBytes(UTF_8)).sendAsync();
futureList.add(produceFuture);
}
// the target topic hasn't the abort marker before commit
ReadOnlyCursor originTopicCursor = getOriginTopicCursor(topic, -1);
Assert.assertNotNull(originTopicCursor);
Assert.assertFalse(originTopicCursor.hasMoreEntries());
originTopicCursor.close();
// the messageId callback can't be called before commit
checkMessageId(futureList, false);
txn.abort().get();
// the messageId callback should be called after commit
checkMessageId(futureList, true);
// the target topic partition doesn't have any entries
originTopicCursor = getOriginTopicCursor(topic, -1);
Assert.assertNotNull(originTopicCursor);
Assert.assertFalse(originTopicCursor.hasMoreEntries());
// the target topic transactionBuffer should receive the transaction messages,
// committing marker and commit marker
ReadOnlyCursor tbTopicCursor = getTBTopicCursor(topic, -1);
Assert.assertNotNull(tbTopicCursor);
Assert.assertTrue(tbTopicCursor.hasMoreEntries());
long tbEntriesCnt = tbTopicCursor.getNumberOfEntries();
log.info("transaction buffer entries count: {}", tbEntriesCnt);
Assert.assertEquals(tbEntriesCnt, messageCnt + 1);
PulsarApi.MessageMetadata messageMetadata;
List<Entry> entries = tbTopicCursor.readEntries((int) tbEntriesCnt);
// check the messages
for (int i = 0; i < messageCnt; i++) {
messageMetadata = Commands.parseMessageMetadata(entries.get(i).getDataBuffer());
Assert.assertEquals(messageMetadata.getTxnidMostBits(), txn.getTxnIdMostBits());
Assert.assertEquals(messageMetadata.getTxnidLeastBits(), txn.getTxnIdLeastBits());
byte[] bytes = new byte[entries.get(i).getDataBuffer().readableBytes()];
entries.get(i).getDataBuffer().readBytes(bytes);
Assert.assertTrue(messageSet.remove(new String(bytes)));
}
// check abort marker
messageMetadata = Commands.parseMessageMetadata(entries.get(messageCnt).getDataBuffer());
Assert.assertEquals(PulsarMarkers.MarkerType.TXN_ABORT_VALUE, messageMetadata.getMarkerType());
Assert.assertEquals(0, messageSet.size());
log.info("finish test produceAndAbortTest.");
}
private void checkMessageId(List<CompletableFuture<MessageId>> futureList, boolean isFinished) {
futureList.forEach(messageIdFuture -> {
try {
MessageId messageId = messageIdFuture.get(1, TimeUnit.SECONDS);
if (isFinished) {
Assert.assertNotNull(messageId);
log.info("Tnx commit success! messageId: {}", messageId);
} else {
Assert.fail("MessageId shouldn't be get before txn abort.");
}
} catch (Exception e) {
if (!isFinished) {
if (e instanceof TimeoutException) {
log.info("This is a expected exception.");
} else {
log.error("This exception is not expected.", e);
Assert.fail("This exception is not expected.");
}
} else {
log.error("Tnx commit failed!", e);
Assert.fail("Tnx commit failed!");
}
}
});
}
private ReadOnlyCursor getTBTopicCursor(String topic, int partition) {
try {
String tbTopicName = PersistentTransactionBuffer.getTransactionBufferTopicName(
TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition);
String topicSuffix = partition >= 0 ? TopicName.PARTITIONED_TOPIC_SUFFIX + partition : "";
topic = PersistentTransactionBuffer.getTransactionBufferTopicName(
TopicName.get(topic).toString() + topicSuffix);
return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
TopicName.get(tbTopicName).getPersistenceNamingEncoding(),
TopicName.get(topic).getPersistenceNamingEncoding(),
PositionImpl.earliest, new ManagedLedgerConfig());
} catch (Exception e) {
log.error("Failed to get transaction buffer topic readonly cursor.", e);
......@@ -237,9 +317,11 @@ public class TransactionProduceTest extends TransactionTestBase {
private ReadOnlyCursor getOriginTopicCursor(String topic, int partition) {
try {
String partitionTopic = TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
if (partition >= 0) {
topic = TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
}
return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
TopicName.get(partitionTopic).getPersistenceNamingEncoding(),
TopicName.get(topic).getPersistenceNamingEncoding(),
PositionImpl.earliest, new ManagedLedgerConfig());
} catch (Exception e) {
log.error("Failed to get origin topic readonly cursor.", e);
......
......@@ -87,7 +87,7 @@ public class EndToEndTest extends TransactionTestBase {
}
@Test
public void test() throws Exception {
public void partitionCommitTest() throws Exception {
Transaction txn = ((PulsarClientImpl) pulsarClient)
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
......@@ -149,4 +149,47 @@ public class EndToEndTest extends TransactionTestBase {
log.info("receive transaction messages count: {}", receiveCnt);
}
@Test
public void partitionAbortTest() throws Exception {
Transaction txn = ((PulsarClientImpl) pulsarClient)
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
.build()
.get();
@Cleanup
PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) pulsarClient
.newProducer()
.topic(TOPIC_OUTPUT)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();
int messageCnt = 10;
for (int i = 0; i < messageCnt; i++) {
producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
}
@Cleanup
MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient
.newConsumer()
.topic(TOPIC_OUTPUT)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test")
.enableBatchIndexAcknowledgment(true)
.subscribe();
// Can't receive transaction messages before abort.
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
txn.abort().get();
// Cant't receive transaction messages after abort.
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
log.info("finished test partitionAbortTest");
}
}
......@@ -269,6 +269,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
@Override
CompletableFuture<MessageId> internalSendAsync(Message<?> message, Transaction txn) {
if (txn instanceof TransactionImpl) {
((TransactionImpl) txn).registerProducedTopic(topic);
}
CompletableFuture<MessageId> future = new CompletableFuture<>();
......@@ -343,9 +346,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
nextCallback = scb;
}
});
if (txn instanceof TransactionImpl) {
((TransactionImpl) txn).registerProducedTopic(topic);
}
return future;
}
......
......@@ -138,6 +138,12 @@ public class TransactionImpl implements Transaction {
@Override
public CompletableFuture<Void> abort() {
return FutureUtil.failedFuture(new UnsupportedOperationException("Not Implemented Yet"));
return tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((ignored, throwable) -> {
sendOps.values().forEach(txnSendOp -> {
txnSendOp.sendFuture.whenComplete(((messageId, t) -> {
txnSendOp.transactionalSendFuture.complete(messageId);
}));
});
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册