提交 b190874d 编写于 作者: S Sijie Guo 提交者: xiaolong.ran

Data is not deleted after expiration due to connected readers (#5621)

* Data is not deleted after expiration due to connected readers

*Problem*

A problem is observed when stress testing pulsar using [pulsar-flink](https://github.com/streamnative/pulsar-flink) -
No matter what TTL or retention setting is used, the data is never cleaned up. So the stress test ends up failing due
to disk filled up.

The root cause of the problem is described as below.

when a reader is opened using `MessageId.earliest`, a non-durable cursor with position (-1, -2) is added to the cursor heap.
The position `(-1, -2)` in the heap is never updated because non-durable cursors are never advanced when mark-deletions
happen. So the slowest cursor position is always `(-1, -2)`, thus causing no ledger can be deleted even they are expired
or over quota.

*Motivation*

Fix the problem to make sure Pulsar honor to TTL and retention settings.

*Modifications*

- Fix the `startPosition` when PersistentTopic opens a non-durable cursor on `MessageId.earliest`.
  So the `startPosition` is (-1, -1) not (-1, -2).

- Fix the `NonDurableCursorImpl` constructor to check if the position in the ledger of `MessageId.earliest`.
  If the provided position is in the `earliest` ledger, the mark-deleted position will be set to the previous
  position of first position.

- Fix the `NonDurableCursorImpl` to advance ledger cursor when mark-deletion happens on a non-durable cursor.

*Verify this change*

Unit tests are coming.

(cherry picked from commit 3e7cb68b)
上级 d946a8fa
......@@ -148,6 +148,12 @@ public class EntryCacheImpl implements EntryCache {
public void invalidateEntries(final PositionImpl lastPosition) {
final PositionImpl firstPosition = PositionImpl.get(-1, 0);
if (firstPosition.compareTo(lastPosition) > 0) {
log.debug("Attempted to invalidate entries in an invalid range : {} ~ {}",
firstPosition, lastPosition);
return;
}
Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, false);
int entriesRemoved = removed.getLeft();
long sizeRemoved = removed.getRight();
......
......@@ -45,7 +45,7 @@ import org.apache.commons.lang3.tuple.Pair;
* care about ledgers to be deleted.
*
*/
class ManagedCursorContainer implements Iterable<ManagedCursor> {
public class ManagedCursorContainer implements Iterable<ManagedCursor> {
private static class Item {
final ManagedCursor cursor;
......
......@@ -104,7 +104,7 @@ public class ManagedCursorImpl implements ManagedCursor {
protected volatile PositionImpl markDeletePosition;
protected volatile PositionImpl readPosition;
private volatile MarkDeleteEntry lastMarkDeleteEntry;
protected volatile MarkDeleteEntry lastMarkDeleteEntry;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp");
......@@ -177,7 +177,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
}
private final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
@SuppressWarnings("unused")
......
......@@ -828,11 +828,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException {
checkManagedLedgerIsOpen();
checkFenced();
return new NonDurableCursorImpl(bookKeeper, config, this, null,
(PositionImpl) startCursorPosition);
return newNonDurableCursor(
startCursorPosition,
"non-durable-cursor-" + UUID.randomUUID());
}
@Override
......@@ -862,12 +860,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
@Override
public Iterable<ManagedCursor> getCursors() {
public ManagedCursorContainer getCursors() {
return cursors;
}
@Override
public Iterable<ManagedCursor> getActiveCursors() {
public ManagedCursorContainer getActiveCursors() {
return activeCursors;
}
......
......@@ -42,7 +42,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
// Start from last entry
initializeCursorPosition(ledger.getLastPositionAndCounter());
} else if (startCursorPosition.equals(PositionImpl.earliest)) {
} else if (startCursorPosition.getLedgerId() == PositionImpl.earliest.getLedgerId()) {
// Start from invalid ledger to read from first available entry
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
} else {
......@@ -83,6 +83,12 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
// Bypass persistence of mark-delete position and individually deleted messages info
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx);
lastMarkDeleteEntry = mdEntry;
// it is important to advance cursor so the retention can kick in as expected.
ledger.updateCursor(NonDurableCursorImpl.this, mdEntry.newPosition);
callback.markDeleteComplete(ctx);
}
......
......@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
......@@ -80,7 +81,7 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
ManagedLedger ledger = factory.open("my_test_ledger");
ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
assertTrue(Iterables.isEmpty(ledger.getCursors()));
assertFalse(Iterables.isEmpty(ledger.getCursors()));
c1.close();
ledger.close();
......@@ -610,6 +611,50 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
ledger.close();
}
@Test
public void testGetSlowestConsumer() throws Exception {
final String mlName = "test-get-slowest-consumer-ml";
final String c1 = "cursor1";
final String nc1 = "non-durable-cursor1";
final String ncEarliest = "non-durable-cursor-earliest";
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, new ManagedLedgerConfig());
Position p1 = ledger.addEntry(c1.getBytes(UTF_8));
log.info("write entry 1 : pos = {}", p1);
Position p2 = ledger.addEntry(nc1.getBytes(UTF_8));
log.info("write entry 2 : pos = {}", p2);
Position p3 = ledger.addEntry(nc1.getBytes(UTF_8));
log.info("write entry 3 : pos = {}", p3);
ManagedCursor cursor1 = ledger.openCursor(c1);
cursor1.seek(p3);
assertEquals(p3, ledger.getCursors().getSlowestReaderPosition());
ManagedCursor nonCursor1 = ledger.newNonDurableCursor(p2, nc1);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());
PositionImpl earliestPos = new PositionImpl(-1, -2);
ManagedCursor nonCursorEarliest = ledger.newNonDurableCursor(earliestPos, ncEarliest);
PositionImpl expectedPos = new PositionImpl(((PositionImpl) p1).getLedgerId(), -1);
assertEquals(expectedPos, ledger.getCursors().getSlowestReaderPosition());
// move non-durable cursor should update the slowest reader position
nonCursorEarliest.markDelete(p1);
assertEquals(p1, ledger.getCursors().getSlowestReaderPosition());
nonCursorEarliest.markDelete(p2);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());
nonCursorEarliest.markDelete(p3);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());
nonCursor1.markDelete(p3);
assertEquals(p3, ledger.getCursors().getSlowestReaderPosition());
ledger.close();
}
@Test(expectedExceptions = NullPointerException.class)
void testCursorWithNameIsNotNull() throws Exception {
final String p1CursorName = "entry-1";
......
......@@ -670,7 +670,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
if (msgId instanceof BatchMessageIdImpl) {
if (ledgerId >= 0
&& msgId instanceof BatchMessageIdImpl) {
// When the start message is relative to a batch, we need to take one step back on the previous message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionCursor;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.transaction.impl.common.TxnID;
/**
* A persistent transaction buffer implementation.
*/
@Slf4j
public class PersistentTransactionBuffer extends PersistentTopic implements TransactionBuffer {
private TransactionCursor txnCursor;
private ManagedCursor retentionCursor;
abstract static class TxnCtx implements PublishContext {
private final long sequenceId;
private final CompletableFuture<Position> completableFuture;
private final String producerName;
TxnCtx(String producerName, long sequenceId, CompletableFuture<Position> future) {
this.sequenceId = sequenceId;
this.completableFuture = future;
this.producerName = producerName;
}
@Override
public String getProducerName() {
return this.producerName;
}
@Override
public long getSequenceId() {
return this.sequenceId;
}
}
public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerService brokerService)
throws BrokerServiceException.NamingException, ManagedLedgerException {
super(topic, ledger, brokerService);
this.txnCursor = new TransactionCursorImpl();
this.retentionCursor = ledger.newNonDurableCursor(
PositionImpl.earliest, "txn-buffer-retention");
}
@Override
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
return txnCursor.getTxnMeta(txnID, false);
}
@Override
public CompletableFuture<Void> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
return publishMessage(txnId, buffer, sequenceId).thenCompose(position -> appendBuffer(txnId, position,
sequenceId));
}
private CompletableFuture<Void> appendBuffer(TxnID txnID, Position position, long sequenceId) {
return txnCursor.getTxnMeta(txnID, true).thenCompose(meta -> meta.appendEntry(sequenceId, position));
}
@Override
public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
return txnCursor.getTxnMeta(txnID, false).thenCompose(this::createNewReader);
}
private CompletableFuture<TransactionBufferReader> createNewReader(TransactionMeta meta) {
CompletableFuture<TransactionBufferReader> createReaderFuture = new CompletableFuture<>();
try {
PersistentTransactionBufferReader reader = new PersistentTransactionBufferReader(meta, ledger);
createReaderFuture.complete(reader);
} catch (TransactionNotSealedException e) {
createReaderFuture.completeExceptionally(e);
}
return createReaderFuture;
}
@Builder
final static class Marker {
long sequenceId;
ByteBuf marker;
}
@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId) {
return txnCursor.getTxnMeta(txnID, false)
.thenApply(meta -> createCommitMarker(meta, committedAtLedgerId, committedAtEntryId))
.thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId))
.thenCompose(position -> txnCursor.commitTxn(committedAtLedgerId, committedAtEntryId, txnID,
position));
}
private Marker createCommitMarker(TransactionMeta meta, long committedAtLedgerId, long committedAtEntryId) {
if (log.isDebugEnabled()) {
log.debug("Transaction {} create a commit marker", meta.id());
}
long sequenceId = meta.lastSequenceId() + 1;
MessageIdData messageIdData = MessageIdData.newBuilder()
.setLedgerId(committedAtLedgerId)
.setEntryId(committedAtEntryId)
.build();
ByteBuf commitMarker = Markers.newTxnCommitMarker(sequenceId, meta.id().getMostSigBits(),
meta.id().getLeastSigBits(), messageIdData);
Marker marker = Marker.builder().sequenceId(sequenceId).marker(commitMarker).build();
return marker;
}
@Override
public CompletableFuture<Void> abortTxn(TxnID txnID) {
return txnCursor.getTxnMeta(txnID, false)
.thenApply(meta -> createAbortMarker(meta))
.thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId))
.thenCompose(position -> txnCursor.abortTxn(txnID));
}
private Marker createAbortMarker(TransactionMeta meta) {
if (log.isDebugEnabled()) {
log.debug("Transaction {} create a abort marker", meta.id());
}
long sequenceId = meta.lastSequenceId() + 1;
ByteBuf abortMarker = Markers.newTxnAbortMarker(sequenceId, meta.id().getMostSigBits(),
meta.id().getLeastSigBits());
Marker marker = Marker.builder().sequenceId(sequenceId).marker(abortMarker).build();
return marker;
}
private CompletableFuture<Position> publishMessage(TxnID txnID, ByteBuf msg, long sequenceId) {
CompletableFuture<Position> publishFuture = new CompletableFuture<>();
publishMessage(msg, new TxnCtx(txnID.toString(), sequenceId, publishFuture) {
@Override
public void completed(Exception e, long ledgerId, long entryId) {
if (e != null) {
publishFuture.completeExceptionally(e);
} else {
publishFuture.complete(PositionImpl.get(ledgerId, entryId));
}
}
});
return publishFuture;
}
@Override
public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
if (log.isDebugEnabled()) {
log.debug("Begin to purge the ledgers {}", dataLedgers);
}
List<CompletableFuture<Void>> futures = dataLedgers.stream().map(dataLedger -> cleanTxnsOnLedger(dataLedger))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenCompose(v -> removeCommittedLedgerFromIndex(dataLedgers));
}
private CompletableFuture<Void> removeCommittedLedgerFromIndex(List<Long> dataLedgers) {
List<CompletableFuture<Void>> removeFutures = dataLedgers.stream().map(
dataLedger -> txnCursor.removeTxnsCommittedAtLedger(dataLedger)).collect(Collectors.toList());
return FutureUtil.waitForAll(removeFutures);
}
private CompletableFuture<Void> cleanTxnsOnLedger(long dataledger) {
if (log.isDebugEnabled()) {
log.debug("Start to clean ledger {}", dataledger);
}
return txnCursor.getAllTxnsCommittedAtLedger(dataledger).thenCompose(txnIDS -> deleteTxns(txnIDS));
}
private CompletableFuture<Void> deleteTxns(Set<TxnID> txnIDS) {
if (log.isDebugEnabled()) {
log.debug("Start delete txns {} under ledger", txnIDS);
}
List<CompletableFuture<Void>> futures = txnIDS.stream().map(txnID -> deleteTxn(txnID))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures);
}
private CompletableFuture<Void> deleteTxn(TxnID txnID) {
if (log.isDebugEnabled()) {
log.debug("Start to delete txn {} entries", txnID);
}
return txnCursor.getTxnMeta(txnID, false)
.thenCompose(meta -> meta.readEntries(meta.numEntries(), -1L))
.thenCompose(longPositionSortedMap -> deleteEntries(longPositionSortedMap, txnID));
}
private CompletableFuture<Void> deleteEntries(SortedMap<Long, Position> entriesMap, TxnID txnID) {
if (log.isDebugEnabled()) {
log.debug("Delete entries {}", entriesMap);
}
List<CompletableFuture<Void>> deleteFutures = entriesMap.values().stream()
.map(position -> asyncDeletePosition(position, txnID))
.collect(Collectors.toList());
return FutureUtil.waitForAll(deleteFutures);
}
private CompletableFuture<Void> asyncDeletePosition(Position position, TxnID txnID) {
if (log.isDebugEnabled()) {
log.debug("Ready to delete position {} for txn {}", position, txnID);
}
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
retentionCursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("Success delete transaction `{}` entry on position {}", txnID, position);
}
deleteFuture.complete(null);
}
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.error("Failed delete transaction `{}` entry on position {}", txnID, position, exception);
deleteFuture.completeExceptionally(exception);
}
}, null);
return deleteFuture;
}
@Override
public CompletableFuture<Void> closeAsync() {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册