From 5bf319e46f70350cc162210c1583e608be581b4e Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Mon, 5 Aug 2019 14:44:43 +0800 Subject: [PATCH] Fix concurrent access of `uninitializedCursors` in `ManagedLedgerImpl.asyncOpenCursor` (#4837) ### Motivation Fix concurrent access of `uninitializedCursors` in `ManagedLedgerImpl.asyncOpenCursor`. ### Modifications * Adds test to expose concurrent access of `uninitializedCursors` in `ManagedLedgerImpl.asyncOpenCursor`. * Fixes concurrent access of `uninitializedCursors` in `ManagedLedgerImpl.asyncOpenCursor`. --- .../mledger/impl/ManagedLedgerImpl.java | 4 +- .../mledger/impl/ManagedLedgerTest.java | 86 +++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1095a90f875..749e560918d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -728,7 +728,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter() : getFirstPositionAndCounter()); - synchronized (this) { + synchronized (ManagedLedgerImpl.this) { cursors.add(cursor); uninitializedCursors.remove(cursorName).complete(cursor); } @@ -739,7 +739,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { public void operationFailed(ManagedLedgerException exception) { log.warn("[{}] Failed to open cursor: {}", name, cursor); - synchronized (this) { + synchronized (ManagedLedgerImpl.this) { uninitializedCursors.remove(cursorName).completeExceptionally(exception); } callback.openCursorFailed(exception, ctx); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d54f11275e0..9f6d96a10e8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -20,9 +20,12 @@ package org.apache.bookkeeper.mledger.impl; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -45,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -53,6 +57,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -94,6 +99,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; @@ -2182,6 +2188,86 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.close(); } + @Test + public void testConcurrentOpenCursorShouldNotHaveConcurrentAccessOfUninitializedCursors() throws Exception { + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("ConcurrentAccessOfUninitializedCursors"); + + final CompletableFuture cursorFuture = new CompletableFuture<>(); + final CompletableFuture removingFuture = new CompletableFuture<>(); + final CompletableFuture concurrentAccessFuture = new CompletableFuture<>(); + final Throwable concurrentAccessTimeout = new TimeoutException(); + + cachedExecutor.execute(() -> { + removingFuture.join(); + CompletableFuture lockingFuture = new CompletableFuture<>(); + cachedExecutor.execute(() -> { + try { + lockingFuture.join(); + + // Gives `synchronized (ledger)` a chance to complete if it got lock immediately. + Thread.sleep(2); + + // Normally, following code will process after success or failure contention of + // `synchronized (ledger)`. Theoretically, it is possible that following code + // complete before contention of `synchronized (ledger)` block, but it is rare + // in practice, and it is not harmful as it produces only false positive cases. + concurrentAccessFuture.completeExceptionally(concurrentAccessTimeout); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + }); + lockingFuture.complete(null); + synchronized (ledger) { + concurrentAccessFuture.complete(null); + } + }); + + Map> uninitializedCursors = ledger.uninitializedCursors; + Map> spyUninitializedCursors = spy(uninitializedCursors); + doAnswer(mock -> { + removingFuture.complete(null); + try { + // Access of uninitializedCursors should guarded by synchronized(ledger), + // so there are must be no concurrent accesses in this scope. If we get this + // future successfully, then there is a concurrent access. + concurrentAccessFuture.get(); + Throwable throwable = new IllegalStateException("Detecting concurrent access of uninitializedCursors"); + cursorFuture.completeExceptionally(throwable); + } catch (Exception ex) { + assertSame(ExceptionUtils.getRootCause(ex), concurrentAccessTimeout); + } + return mock.callRealMethod(); + }).when(spyUninitializedCursors).remove(anyString()); + setFieldValue(ManagedLedgerImpl.class, ledger, "uninitializedCursors", spyUninitializedCursors); + + cachedExecutor.execute(() -> { + try { + ledger.asyncOpenCursor("c1", new OpenCursorCallback() { + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + cursorFuture.completeExceptionally(exception); + } + + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + cursorFuture.complete(cursor); + } + }, null); + } catch (Exception e) { + cursorFuture.completeExceptionally(e); + } + }); + + try { + ManagedCursor cursor = cursorFuture.get(); + assertNotNull(cursor); + } catch (Exception ex) { + fail(ExceptionUtils.getRootCauseMessage(ex)); + } finally { + ledger.close(); + } + } + public ByteBuf getMessageWithMetadata(byte[] data) throws IOException { MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()) .setProducerName("prod-name").setSequenceId(0).build(); -- GitLab