diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1095a90f8755501b09290a9e8e3907d3f90e3bef..749e560918d4d5e1fa129e7e27e08125cb17b8a7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -728,7 +728,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter() : getFirstPositionAndCounter()); - synchronized (this) { + synchronized (ManagedLedgerImpl.this) { cursors.add(cursor); uninitializedCursors.remove(cursorName).complete(cursor); } @@ -739,7 +739,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { public void operationFailed(ManagedLedgerException exception) { log.warn("[{}] Failed to open cursor: {}", name, cursor); - synchronized (this) { + synchronized (ManagedLedgerImpl.this) { uninitializedCursors.remove(cursorName).completeExceptionally(exception); } callback.openCursorFailed(exception, ctx); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d54f11275e0991581fe3c651f220932285bdb318..9f6d96a10e8112e0d9ffa0d0e4a417a3bd9d8fd8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -20,9 +20,12 @@ package org.apache.bookkeeper.mledger.impl; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -45,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -53,6 +57,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -94,6 +99,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; @@ -2182,6 +2188,86 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.close(); } + @Test + public void testConcurrentOpenCursorShouldNotHaveConcurrentAccessOfUninitializedCursors() throws Exception { + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("ConcurrentAccessOfUninitializedCursors"); + + final CompletableFuture cursorFuture = new CompletableFuture<>(); + final CompletableFuture removingFuture = new CompletableFuture<>(); + final CompletableFuture concurrentAccessFuture = new CompletableFuture<>(); + final Throwable concurrentAccessTimeout = new TimeoutException(); + + cachedExecutor.execute(() -> { + removingFuture.join(); + CompletableFuture lockingFuture = new CompletableFuture<>(); + cachedExecutor.execute(() -> { + try { + lockingFuture.join(); + + // Gives `synchronized (ledger)` a chance to complete if it got lock immediately. + Thread.sleep(2); + + // Normally, following code will process after success or failure contention of + // `synchronized (ledger)`. Theoretically, it is possible that following code + // complete before contention of `synchronized (ledger)` block, but it is rare + // in practice, and it is not harmful as it produces only false positive cases. + concurrentAccessFuture.completeExceptionally(concurrentAccessTimeout); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + }); + lockingFuture.complete(null); + synchronized (ledger) { + concurrentAccessFuture.complete(null); + } + }); + + Map> uninitializedCursors = ledger.uninitializedCursors; + Map> spyUninitializedCursors = spy(uninitializedCursors); + doAnswer(mock -> { + removingFuture.complete(null); + try { + // Access of uninitializedCursors should guarded by synchronized(ledger), + // so there are must be no concurrent accesses in this scope. If we get this + // future successfully, then there is a concurrent access. + concurrentAccessFuture.get(); + Throwable throwable = new IllegalStateException("Detecting concurrent access of uninitializedCursors"); + cursorFuture.completeExceptionally(throwable); + } catch (Exception ex) { + assertSame(ExceptionUtils.getRootCause(ex), concurrentAccessTimeout); + } + return mock.callRealMethod(); + }).when(spyUninitializedCursors).remove(anyString()); + setFieldValue(ManagedLedgerImpl.class, ledger, "uninitializedCursors", spyUninitializedCursors); + + cachedExecutor.execute(() -> { + try { + ledger.asyncOpenCursor("c1", new OpenCursorCallback() { + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + cursorFuture.completeExceptionally(exception); + } + + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + cursorFuture.complete(cursor); + } + }, null); + } catch (Exception e) { + cursorFuture.completeExceptionally(e); + } + }); + + try { + ManagedCursor cursor = cursorFuture.get(); + assertNotNull(cursor); + } catch (Exception ex) { + fail(ExceptionUtils.getRootCauseMessage(ex)); + } finally { + ledger.close(); + } + } + public ByteBuf getMessageWithMetadata(byte[] data) throws IOException { MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()) .setProducerName("prod-name").setSequenceId(0).build();