提交 7ec92b6d 编写于 作者: X Xiaobing Fang 提交者: Jia Zhai

Fix : The frist position in managedLedger is ahead of the last (#4853)

Fixes #4852

Bug:

After create an empty ledger by sub/unsub and trim ledgers, the first position in managedLedger is ahead of the last when create a consumer without reopen the managed ledger.

Reason:

Last position will be update when opening managedLedger.
Because when creating an empty Ledger without reopening managedLedger, the last position is not be updated. Since the ledger pointed of the last position has been deleted, the first position point to the new ledger.

Fix:

The first position should fall back to last position.
(cherry picked from commit 46bfb2c2)
上级 2725ef3e
......@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
......@@ -2763,7 +2764,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
PositionImpl getFirstPosition() {
Long ledgerId = ledgers.firstKey();
return ledgerId == null ? null : new PositionImpl(ledgerId, -1);
if (ledgerId == null) {
return null;
}
if (ledgerId > lastConfirmedEntry.getLedgerId()) {
checkState(ledgers.get(ledgerId).getEntries() == 0);
ledgerId = lastConfirmedEntry.getLedgerId();
}
return new PositionImpl(ledgerId, -1);
}
PositionImpl getLastPosition() {
......
......@@ -1692,6 +1692,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ml.close();
}
/**
* Set retention time = 0 and create a empty ledger,
* first position can't higher than last after trim ledgers.
*/
@Test
public void testRetention0WithEmptyLedger() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionTime(0, TimeUnit.MINUTES);
config.setMaxEntriesPerLedger(1);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
ManagedCursor c1 = ml.openCursor("c1noretention");
ml.addEntry("message1".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
ml.close();
// reopen ml
ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
c1 = ml.openCursor("c1noretention");
ml.deleteCursor(c1.getName());
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
assertTrue(ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId);
ml.close();
}
@Test
public void testInfiniteRetention() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册