From bdf13a1fdb54b79e0ece299535de75c653abc827 Mon Sep 17 00:00:00 2001 From: benjamintboyle Date: Sat, 13 Mar 2021 23:50:44 -0800 Subject: [PATCH] Fix intermittently failing unit tests (#7213) * Fix intermittently failing tests, mostly increasing timeout Fix several unit tests that are intermittently failing. All fixes involve increasing timeouts. Adjusted formatting in several areas within updated tests. * Remove formatting/refactoring from previous commit Superfluous formatting and refactoring was making review impossible. --- .../FlowableConcatMapSchedulerTest.java | 8 +++- .../flowable/FlowableConcatTest.java | 8 +++- .../operators/flowable/FlowableMergeTest.java | 40 +++++++++---------- .../flowable/FlowableSubscribeOnTest.java | 12 +++--- .../ObservableConcatMapSchedulerTest.java | 8 +++- .../observable/ObservableConcatTest.java | 8 +++- .../rxjava3/subjects/UnicastSubjectTest.java | 4 +- 7 files changed, 51 insertions(+), 37 deletions(-) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java index 825bb5565..0f7c77256 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java @@ -355,7 +355,7 @@ public class FlowableConcatMapSchedulerTest extends RxJavaTest { } @Test - public void issue2890NoStackoverflow() throws InterruptedException { + public void issue2890NoStackoverflow() throws InterruptedException, TimeoutException { final ExecutorService executor = Executors.newFixedThreadPool(2); final Scheduler sch = Schedulers.from(executor); @@ -400,7 +400,11 @@ public class FlowableConcatMapSchedulerTest extends RxJavaTest { } }); - executor.awaitTermination(20000, TimeUnit.MILLISECONDS); + long awaitTerminationTimeoutMillis = 100_000; + if (!executor.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("Completed " + counter.get() + "/" + n + " before timed out after " + + awaitTerminationTimeoutMillis + " milliseconds."); + } assertEquals(n, counter.get()); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java index 1a8b6f4c4..986dddad0 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java @@ -729,7 +729,7 @@ public class FlowableConcatTest { } @Test - public void issue2890NoStackoverflow() throws InterruptedException { + public void issue2890NoStackoverflow() throws InterruptedException, TimeoutException { final ExecutorService executor = Executors.newFixedThreadPool(2); final Scheduler sch = Schedulers.from(executor); @@ -774,7 +774,11 @@ public class FlowableConcatTest { } }); - executor.awaitTermination(20000, TimeUnit.MILLISECONDS); + long awaitTerminationTimeoutMillis = 100_000; + if (!executor.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("Completed " + counter.get() + "/" + n + " before timed out after " + + awaitTerminationTimeoutMillis + " milliseconds."); + } assertEquals(n, counter.get()); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeTest.java index a40f7debb..57b7a4a66 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeTest.java @@ -204,7 +204,7 @@ public class FlowableMergeTest extends RxJavaTest { TestSubscriber ts = new TestSubscriber<>(stringSubscriber); m.subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); verify(stringSubscriber, never()).onError(any(Throwable.class)); @@ -598,7 +598,7 @@ public class FlowableMergeTest extends RxJavaTest { TestSubscriber ts = new TestSubscriber<>(); merge.subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertComplete(); List onNextEvents = ts.values(); assertEquals(300, onNextEvents.size()); @@ -645,7 +645,7 @@ public class FlowableMergeTest extends RxJavaTest { TestSubscriber ts = new TestSubscriber<>(); merge.subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); ts.assertComplete(); List onNextEvents = ts.values(); @@ -706,7 +706,7 @@ public class FlowableMergeTest extends RxJavaTest { }; Flowable.merge(f1.take(Flowable.bufferSize() * 2), Flowable.just(-99)).subscribe(testSubscriber); - testSubscriber.awaitDone(5, TimeUnit.SECONDS); + testSubscriber.awaitDone(10, TimeUnit.SECONDS); List onNextEvents = testSubscriber.values(); @@ -752,7 +752,7 @@ public class FlowableMergeTest extends RxJavaTest { }; Flowable.merge(f1.take(Flowable.bufferSize() * 2), f2.take(Flowable.bufferSize() * 2)).observeOn(Schedulers.computation()).subscribe(testSubscriber); - testSubscriber.awaitDone(5, TimeUnit.SECONDS); + testSubscriber.awaitDone(10, TimeUnit.SECONDS); if (testSubscriber.errors().size() > 0) { testSubscriber.errors().get(0).printStackTrace(); } @@ -795,7 +795,7 @@ public class FlowableMergeTest extends RxJavaTest { }; Flowable.merge(f1).observeOn(Schedulers.computation()).take(Flowable.bufferSize() * 2).subscribe(testSubscriber); - testSubscriber.awaitDone(5, TimeUnit.SECONDS); + testSubscriber.awaitDone(10, TimeUnit.SECONDS); if (testSubscriber.errors().size() > 0) { testSubscriber.errors().get(0).printStackTrace(); } @@ -850,7 +850,7 @@ public class FlowableMergeTest extends RxJavaTest { }; Flowable.merge(f1).observeOn(Schedulers.computation()).take(Flowable.bufferSize() * 2).subscribe(testSubscriber); - testSubscriber.awaitDone(5, TimeUnit.SECONDS); + testSubscriber.awaitDone(10, TimeUnit.SECONDS); if (testSubscriber.errors().size() > 0) { testSubscriber.errors().get(0).printStackTrace(); } @@ -868,7 +868,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge1AsyncStreamOf1() { TestSubscriber ts = new TestSubscriber<>(); mergeNAsyncStreamsOfN(1, 1).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(1, ts.values().size()); } @@ -877,7 +877,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge1AsyncStreamOf1000() { TestSubscriber ts = new TestSubscriber<>(); mergeNAsyncStreamsOfN(1, 1000).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(1000, ts.values().size()); } @@ -886,7 +886,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge10AsyncStreamOf1000() { TestSubscriber ts = new TestSubscriber<>(); mergeNAsyncStreamsOfN(10, 1000).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(10000, ts.values().size()); } @@ -895,7 +895,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge1000AsyncStreamOf1000() { TestSubscriber ts = new TestSubscriber<>(); mergeNAsyncStreamsOfN(1000, 1000).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(1000000, ts.values().size()); } @@ -904,7 +904,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge2000AsyncStreamOf100() { TestSubscriber ts = new TestSubscriber<>(); mergeNAsyncStreamsOfN(2000, 100).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(200000, ts.values().size()); } @@ -913,7 +913,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge100AsyncStreamOf1() { TestSubscriber ts = new TestSubscriber<>(); mergeNAsyncStreamsOfN(100, 1).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(100, ts.values().size()); } @@ -935,7 +935,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge1SyncStreamOf1() { TestSubscriber ts = new TestSubscriber<>(); mergeNSyncStreamsOfN(1, 1).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(1, ts.values().size()); } @@ -944,7 +944,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge1SyncStreamOf1000000() { TestSubscriber ts = new TestSubscriber<>(); mergeNSyncStreamsOfN(1, 1000000).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(1000000, ts.values().size()); } @@ -953,7 +953,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge1000SyncStreamOf1000() { TestSubscriber ts = new TestSubscriber<>(); mergeNSyncStreamsOfN(1000, 1000).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(1000000, ts.values().size()); } @@ -962,7 +962,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge10000SyncStreamOf10() { TestSubscriber ts = new TestSubscriber<>(); mergeNSyncStreamsOfN(10000, 10).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(100000, ts.values().size()); } @@ -971,7 +971,7 @@ public class FlowableMergeTest extends RxJavaTest { public void merge1000000SyncStreamOf1() { TestSubscriber ts = new TestSubscriber<>(); mergeNSyncStreamsOfN(1000000, 1).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(1000000, ts.values().size()); } @@ -1043,7 +1043,7 @@ public class FlowableMergeTest extends RxJavaTest { }); Flowable.merge(os).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); assertEquals(10000, ts.values().size()); } @@ -1196,7 +1196,7 @@ public class FlowableMergeTest extends RxJavaTest { latch.countDown(); } }).subscribe(); - boolean a = latch.await(2, TimeUnit.SECONDS); + boolean a = latch.await(10, TimeUnit.SECONDS); if (!a) { for (String s : messages) { System.out.println("DEBUG => " + s); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSubscribeOnTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSubscribeOnTest.java index 9cb263e23..01ee614fd 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSubscribeOnTest.java @@ -202,7 +202,7 @@ public class FlowableSubscribeOnTest extends RxJavaTest { System.out.println("First schedule: " + t); assertTrue(t.getName().startsWith("Rx")); ts.request(10); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(20, TimeUnit.SECONDS); System.out.println("After reschedule: " + ts.lastThread()); assertEquals(t, ts.lastThread()); } @@ -254,7 +254,7 @@ public class FlowableSubscribeOnTest extends RxJavaTest { } }).subscribeOn(Schedulers.newThread()).subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); + ts.awaitDone(20, TimeUnit.SECONDS); ts.assertNoErrors(); } @@ -330,7 +330,7 @@ public class FlowableSubscribeOnTest extends RxJavaTest { .subscribeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .test() - .awaitDone(5, TimeUnit.SECONDS) + .awaitDone(20, TimeUnit.SECONDS) .assertNoErrors() .assertComplete(); @@ -355,7 +355,7 @@ public class FlowableSubscribeOnTest extends RxJavaTest { .subscribeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .test() - .awaitDone(5, TimeUnit.SECONDS) + .awaitDone(20, TimeUnit.SECONDS) .assertValueCount(Flowable.bufferSize()) .assertNoErrors() .assertComplete(); @@ -377,7 +377,7 @@ public class FlowableSubscribeOnTest extends RxJavaTest { .subscribeOn(Schedulers.single(), false) .observeOn(Schedulers.computation()) .test() - .awaitDone(5, TimeUnit.SECONDS) + .awaitDone(20, TimeUnit.SECONDS) .assertNoErrors() .assertComplete(); @@ -402,7 +402,7 @@ public class FlowableSubscribeOnTest extends RxJavaTest { .subscribeOn(Schedulers.single(), true) .observeOn(Schedulers.computation()) .test() - .awaitDone(5, TimeUnit.SECONDS) + .awaitDone(20, TimeUnit.SECONDS) .assertValueCount(Flowable.bufferSize()) .assertNoErrors() .assertComplete(); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapSchedulerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapSchedulerTest.java index e6a50b9af..f2a6ede31 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapSchedulerTest.java @@ -307,7 +307,7 @@ public class ObservableConcatMapSchedulerTest { } @Test - public void issue2890NoStackoverflow() throws InterruptedException { + public void issue2890NoStackoverflow() throws InterruptedException, TimeoutException { final ExecutorService executor = Executors.newFixedThreadPool(2); final Scheduler sch = Schedulers.from(executor); @@ -352,7 +352,11 @@ public class ObservableConcatMapSchedulerTest { } }); - executor.awaitTermination(20000, TimeUnit.MILLISECONDS); + long awaitTerminationTimeout = 100_000; + if (!executor.awaitTermination(awaitTerminationTimeout, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("Completed " + counter.get() + "/" + n + " before timed out after " + + awaitTerminationTimeout + " milliseconds."); + } assertEquals(n, counter.get()); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java index 4b1462e30..d14f803ee 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java @@ -683,7 +683,7 @@ public class ObservableConcatTest extends RxJavaTest { } @Test - public void issue2890NoStackoverflow() throws InterruptedException { + public void issue2890NoStackoverflow() throws InterruptedException, TimeoutException { final ExecutorService executor = Executors.newFixedThreadPool(2); final Scheduler sch = Schedulers.from(executor); @@ -728,7 +728,11 @@ public class ObservableConcatTest extends RxJavaTest { } }); - executor.awaitTermination(20000, TimeUnit.MILLISECONDS); + long awaitTerminationTimeout = 100_000; + if (!executor.awaitTermination(awaitTerminationTimeout, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("Completed " + counter.get() + "/" + n + " before timed out after " + + awaitTerminationTimeout + " milliseconds."); + } assertEquals(n, counter.get()); } diff --git a/src/test/java/io/reactivex/rxjava3/subjects/UnicastSubjectTest.java b/src/test/java/io/reactivex/rxjava3/subjects/UnicastSubjectTest.java index cf1f5d4a7..85cb20b48 100644 --- a/src/test/java/io/reactivex/rxjava3/subjects/UnicastSubjectTest.java +++ b/src/test/java/io/reactivex/rxjava3/subjects/UnicastSubjectTest.java @@ -481,9 +481,7 @@ public class UnicastSubjectTest extends SubjectTest { us.onNext(i); } - to - .awaitDone(5, TimeUnit.SECONDS) - ; + to.awaitDone(10, TimeUnit.SECONDS); if (!errors.isEmpty()) { throw new CompositeException(errors); -- GitLab