diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index c08fb9c1d4f5fb36b63d05d61bda397ed7d89526..1acbc824f9d2936e28d69e19193dfdff1d77d548 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -148,6 +148,12 @@ public class EntryCacheImpl implements EntryCache { public void invalidateEntries(final PositionImpl lastPosition) { final PositionImpl firstPosition = PositionImpl.get(-1, 0); + if (firstPosition.compareTo(lastPosition) > 0) { + log.debug("Attempted to invalidate entries in an invalid range : {} ~ {}", + firstPosition, lastPosition); + return; + } + Pair removed = entries.removeRange(firstPosition, lastPosition, false); int entriesRemoved = removed.getLeft(); long sizeRemoved = removed.getRight(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index 1f8dade9e96b299b57bb7fcf00ef7dc55fea7292..7ca4ebcdefc11e6b4c2c30176e74eb73698c0520 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -45,7 +45,7 @@ import org.apache.commons.lang3.tuple.Pair; * care about ledgers to be deleted. * */ -class ManagedCursorContainer implements Iterable { +public class ManagedCursorContainer implements Iterable { private static class Item { final ManagedCursor cursor; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 31561a8e2df625e46b9cc8ed54deb41186828fc6..99aad05ca5708b53fa7aeb80aaf9b84d39db2504 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -104,7 +104,7 @@ public class ManagedCursorImpl implements ManagedCursor { protected volatile PositionImpl markDeletePosition; protected volatile PositionImpl readPosition; - private volatile MarkDeleteEntry lastMarkDeleteEntry; + protected volatile MarkDeleteEntry lastMarkDeleteEntry; protected static final AtomicReferenceFieldUpdater WAITING_READ_OP_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp"); @@ -177,7 +177,7 @@ public class ManagedCursorImpl implements ManagedCursor { } } - private final ArrayDeque pendingMarkDeleteOps = new ArrayDeque<>(); + protected final ArrayDeque pendingMarkDeleteOps = new ArrayDeque<>(); private static final AtomicIntegerFieldUpdater PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount"); @SuppressWarnings("unused") diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ce3b177fe8986480cffe3b5ed7210155d219475a..8fb77d9d02c3eb5b3323c13abcb0e7b5c01f779d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -828,11 +828,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException { - checkManagedLedgerIsOpen(); - checkFenced(); - - return new NonDurableCursorImpl(bookKeeper, config, this, null, - (PositionImpl) startCursorPosition); + return newNonDurableCursor( + startCursorPosition, + "non-durable-cursor-" + UUID.randomUUID()); } @Override @@ -862,12 +860,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } @Override - public Iterable getCursors() { + public ManagedCursorContainer getCursors() { return cursors; } @Override - public Iterable getActiveCursors() { + public ManagedCursorContainer getActiveCursors() { return activeCursors; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 43bab0f6296af76cdf9ca4f904dbcac07dbe9ea6..bc57f79c58c07374b20769c6fd6b5f550c98e2a8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -42,7 +42,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) { // Start from last entry initializeCursorPosition(ledger.getLastPositionAndCounter()); - } else if (startCursorPosition.equals(PositionImpl.earliest)) { + } else if (startCursorPosition.getLedgerId() == PositionImpl.earliest.getLedgerId()) { // Start from invalid ledger to read from first available entry recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition())); } else { @@ -83,6 +83,12 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map properties, final MarkDeleteCallback callback, final Object ctx) { // Bypass persistence of mark-delete position and individually deleted messages info + + MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx); + lastMarkDeleteEntry = mdEntry; + // it is important to advance cursor so the retention can kick in as expected. + ledger.updateCursor(NonDurableCursorImpl.this, mdEntry.newPosition); + callback.markDeleteComplete(ctx); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 1579a7a71d20e36a2fbc8165e78def7a56bd6896..957e63bfa3e6a263a78cf8496ff819519c369be2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -80,7 +81,7 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase { ManagedLedger ledger = factory.open("my_test_ledger"); ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest); - assertTrue(Iterables.isEmpty(ledger.getCursors())); + assertFalse(Iterables.isEmpty(ledger.getCursors())); c1.close(); ledger.close(); @@ -610,6 +611,50 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase { ledger.close(); } + @Test + public void testGetSlowestConsumer() throws Exception { + final String mlName = "test-get-slowest-consumer-ml"; + final String c1 = "cursor1"; + final String nc1 = "non-durable-cursor1"; + final String ncEarliest = "non-durable-cursor-earliest"; + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, new ManagedLedgerConfig()); + Position p1 = ledger.addEntry(c1.getBytes(UTF_8)); + log.info("write entry 1 : pos = {}", p1); + Position p2 = ledger.addEntry(nc1.getBytes(UTF_8)); + log.info("write entry 2 : pos = {}", p2); + Position p3 = ledger.addEntry(nc1.getBytes(UTF_8)); + log.info("write entry 3 : pos = {}", p3); + + ManagedCursor cursor1 = ledger.openCursor(c1); + cursor1.seek(p3); + assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); + + ManagedCursor nonCursor1 = ledger.newNonDurableCursor(p2, nc1); + assertEquals(p2, ledger.getCursors().getSlowestReaderPosition()); + + PositionImpl earliestPos = new PositionImpl(-1, -2); + + ManagedCursor nonCursorEarliest = ledger.newNonDurableCursor(earliestPos, ncEarliest); + PositionImpl expectedPos = new PositionImpl(((PositionImpl) p1).getLedgerId(), -1); + assertEquals(expectedPos, ledger.getCursors().getSlowestReaderPosition()); + + // move non-durable cursor should update the slowest reader position + nonCursorEarliest.markDelete(p1); + assertEquals(p1, ledger.getCursors().getSlowestReaderPosition()); + + nonCursorEarliest.markDelete(p2); + assertEquals(p2, ledger.getCursors().getSlowestReaderPosition()); + + nonCursorEarliest.markDelete(p3); + assertEquals(p2, ledger.getCursors().getSlowestReaderPosition()); + + nonCursor1.markDelete(p3); + assertEquals(p3, ledger.getCursors().getSlowestReaderPosition()); + + ledger.close(); + } + @Test(expectedExceptions = NullPointerException.class) void testCursorWithNameIsNotNull() throws Exception { final String p1CursorName = "entry-1"; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1a088a148c7036515892ddad97e978be11bfa080..85e7778f02d1ab7097ca71b3c57169831065409c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -670,7 +670,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal long ledgerId = msgId.getLedgerId(); long entryId = msgId.getEntryId(); - if (msgId instanceof BatchMessageIdImpl) { + if (ledgerId >= 0 + && msgId instanceof BatchMessageIdImpl) { // When the start message is relative to a batch, we need to take one step back on the previous message, // because the "batch" might not have been consumed in its entirety. // The client will then be able to discard the first messages if needed. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java new file mode 100644 index 0000000000000000000000000000000000000000..84f4eac59489411abb7781b5687714db755880fe --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import java.util.List; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData; +import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; +import org.apache.pulsar.broker.transaction.buffer.TransactionCursor; +import org.apache.pulsar.broker.transaction.buffer.TransactionMeta; +import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException; +import org.apache.pulsar.transaction.impl.common.TxnID; + +/** + * A persistent transaction buffer implementation. + */ +@Slf4j +public class PersistentTransactionBuffer extends PersistentTopic implements TransactionBuffer { + + private TransactionCursor txnCursor; + private ManagedCursor retentionCursor; + + + abstract static class TxnCtx implements PublishContext { + private final long sequenceId; + private final CompletableFuture completableFuture; + private final String producerName; + + TxnCtx(String producerName, long sequenceId, CompletableFuture future) { + this.sequenceId = sequenceId; + this.completableFuture = future; + this.producerName = producerName; + } + + + + @Override + public String getProducerName() { + return this.producerName; + } + + @Override + public long getSequenceId() { + return this.sequenceId; + } + } + + public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerService brokerService) + throws BrokerServiceException.NamingException, ManagedLedgerException { + super(topic, ledger, brokerService); + this.txnCursor = new TransactionCursorImpl(); + this.retentionCursor = ledger.newNonDurableCursor( + PositionImpl.earliest, "txn-buffer-retention"); + } + + @Override + public CompletableFuture getTransactionMeta(TxnID txnID) { + return txnCursor.getTxnMeta(txnID, false); + } + + @Override + public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { + return publishMessage(txnId, buffer, sequenceId).thenCompose(position -> appendBuffer(txnId, position, + sequenceId)); + } + + private CompletableFuture appendBuffer(TxnID txnID, Position position, long sequenceId) { + return txnCursor.getTxnMeta(txnID, true).thenCompose(meta -> meta.appendEntry(sequenceId, position)); + } + + @Override + public CompletableFuture openTransactionBufferReader(TxnID txnID, long startSequenceId) { + return txnCursor.getTxnMeta(txnID, false).thenCompose(this::createNewReader); + } + + private CompletableFuture createNewReader(TransactionMeta meta) { + CompletableFuture createReaderFuture = new CompletableFuture<>(); + + try { + PersistentTransactionBufferReader reader = new PersistentTransactionBufferReader(meta, ledger); + createReaderFuture.complete(reader); + } catch (TransactionNotSealedException e) { + createReaderFuture.completeExceptionally(e); + } + + return createReaderFuture; + } + + @Builder + final static class Marker { + long sequenceId; + ByteBuf marker; + } + + @Override + public CompletableFuture commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId) { + return txnCursor.getTxnMeta(txnID, false) + .thenApply(meta -> createCommitMarker(meta, committedAtLedgerId, committedAtEntryId)) + .thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId)) + .thenCompose(position -> txnCursor.commitTxn(committedAtLedgerId, committedAtEntryId, txnID, + position)); + } + + private Marker createCommitMarker(TransactionMeta meta, long committedAtLedgerId, long committedAtEntryId) { + if (log.isDebugEnabled()) { + log.debug("Transaction {} create a commit marker", meta.id()); + } + long sequenceId = meta.lastSequenceId() + 1; + MessageIdData messageIdData = MessageIdData.newBuilder() + .setLedgerId(committedAtLedgerId) + .setEntryId(committedAtEntryId) + .build(); + ByteBuf commitMarker = Markers.newTxnCommitMarker(sequenceId, meta.id().getMostSigBits(), + meta.id().getLeastSigBits(), messageIdData); + Marker marker = Marker.builder().sequenceId(sequenceId).marker(commitMarker).build(); + return marker; + } + + @Override + public CompletableFuture abortTxn(TxnID txnID) { + return txnCursor.getTxnMeta(txnID, false) + .thenApply(meta -> createAbortMarker(meta)) + .thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId)) + .thenCompose(position -> txnCursor.abortTxn(txnID)); + } + + private Marker createAbortMarker(TransactionMeta meta) { + if (log.isDebugEnabled()) { + log.debug("Transaction {} create a abort marker", meta.id()); + } + long sequenceId = meta.lastSequenceId() + 1; + ByteBuf abortMarker = Markers.newTxnAbortMarker(sequenceId, meta.id().getMostSigBits(), + meta.id().getLeastSigBits()); + Marker marker = Marker.builder().sequenceId(sequenceId).marker(abortMarker).build(); + return marker; + } + + + private CompletableFuture publishMessage(TxnID txnID, ByteBuf msg, long sequenceId) { + CompletableFuture publishFuture = new CompletableFuture<>(); + publishMessage(msg, new TxnCtx(txnID.toString(), sequenceId, publishFuture) { + @Override + public void completed(Exception e, long ledgerId, long entryId) { + if (e != null) { + publishFuture.completeExceptionally(e); + } else { + publishFuture.complete(PositionImpl.get(ledgerId, entryId)); + } + } + }); + return publishFuture; + } + + @Override + public CompletableFuture purgeTxns(List dataLedgers) { + if (log.isDebugEnabled()) { + log.debug("Begin to purge the ledgers {}", dataLedgers); + } + + List> futures = dataLedgers.stream().map(dataLedger -> cleanTxnsOnLedger(dataLedger)) + .collect(Collectors.toList()); + return FutureUtil.waitForAll(futures).thenCompose(v -> removeCommittedLedgerFromIndex(dataLedgers)); + } + + private CompletableFuture removeCommittedLedgerFromIndex(List dataLedgers) { + List> removeFutures = dataLedgers.stream().map( + dataLedger -> txnCursor.removeTxnsCommittedAtLedger(dataLedger)).collect(Collectors.toList()); + return FutureUtil.waitForAll(removeFutures); + } + + private CompletableFuture cleanTxnsOnLedger(long dataledger) { + if (log.isDebugEnabled()) { + log.debug("Start to clean ledger {}", dataledger); + } + return txnCursor.getAllTxnsCommittedAtLedger(dataledger).thenCompose(txnIDS -> deleteTxns(txnIDS)); + } + + private CompletableFuture deleteTxns(Set txnIDS) { + if (log.isDebugEnabled()) { + log.debug("Start delete txns {} under ledger", txnIDS); + } + List> futures = txnIDS.stream().map(txnID -> deleteTxn(txnID)) + .collect(Collectors.toList()); + return FutureUtil.waitForAll(futures); + } + + private CompletableFuture deleteTxn(TxnID txnID) { + if (log.isDebugEnabled()) { + log.debug("Start to delete txn {} entries", txnID); + } + return txnCursor.getTxnMeta(txnID, false) + .thenCompose(meta -> meta.readEntries(meta.numEntries(), -1L)) + .thenCompose(longPositionSortedMap -> deleteEntries(longPositionSortedMap, txnID)); + } + + private CompletableFuture deleteEntries(SortedMap entriesMap, TxnID txnID) { + if (log.isDebugEnabled()) { + log.debug("Delete entries {}", entriesMap); + } + List> deleteFutures = entriesMap.values().stream() + .map(position -> asyncDeletePosition(position, txnID)) + .collect(Collectors.toList()); + + return FutureUtil.waitForAll(deleteFutures); + } + + private CompletableFuture asyncDeletePosition(Position position, TxnID txnID) { + if (log.isDebugEnabled()) { + log.debug("Ready to delete position {} for txn {}", position, txnID); + } + CompletableFuture deleteFuture = new CompletableFuture<>(); + retentionCursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + if (log.isDebugEnabled()) { + log.debug("Success delete transaction `{}` entry on position {}", txnID, position); + } + deleteFuture.complete(null); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + log.error("Failed delete transaction `{}` entry on position {}", txnID, position, exception); + deleteFuture.completeExceptionally(exception); + } + }, null); + + return deleteFuture; + } + + @Override + public CompletableFuture closeAsync() { + return FutureUtil.failedFuture(new UnsupportedOperationException()); + } +}