diff --git a/src/share/classes/java/util/stream/DoublePipeline.java b/src/share/classes/java/util/stream/DoublePipeline.java index 3e9cddbd324ee389e7950f06b8b6fc11efb087fd..ce626640d015a4d9b3ad3c4c1011bc911b0dba57 100644 --- a/src/share/classes/java/util/stream/DoublePipeline.java +++ b/src/share/classes/java/util/stream/DoublePipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -260,6 +260,12 @@ abstract class DoublePipeline @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedDouble(sink) { + // true if cancellationRequested() has been called + boolean cancellationRequestedCalled; + + // cache the consumer to avoid creation on every accepted element + DoubleConsumer downstreamAsDouble = downstream::accept; + @Override public void begin(long size) { downstream.begin(-1); @@ -268,11 +274,27 @@ abstract class DoublePipeline @Override public void accept(double t) { try (DoubleStream result = mapper.apply(t)) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - if (result != null) - result.sequential().forEach(i -> downstream.accept(i)); + if (result != null) { + if (!cancellationRequestedCalled) { + result.sequential().forEach(downstreamAsDouble); + } + else { + Spliterator.OfDouble s = result.sequential().spliterator(); + do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); + } + } } } + + @Override + public boolean cancellationRequested() { + // If this method is called then an operation within the stream + // pipeline is short-circuiting (see AbstractPipeline.copyInto). + // Note that we cannot differentiate between an upstream or + // downstream operation + cancellationRequestedCalled = true; + return downstream.cancellationRequested(); + } }; } }; diff --git a/src/share/classes/java/util/stream/IntPipeline.java b/src/share/classes/java/util/stream/IntPipeline.java index 313045f96c70d04b6154eef53371ea557e6cf852..3f809ea4cc12de52ab411cd8463c3f45f358929f 100644 --- a/src/share/classes/java/util/stream/IntPipeline.java +++ b/src/share/classes/java/util/stream/IntPipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -296,6 +296,12 @@ abstract class IntPipeline @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedInt(sink) { + // true if cancellationRequested() has been called + boolean cancellationRequestedCalled; + + // cache the consumer to avoid creation on every accepted element + IntConsumer downstreamAsInt = downstream::accept; + @Override public void begin(long size) { downstream.begin(-1); @@ -304,11 +310,27 @@ abstract class IntPipeline @Override public void accept(int t) { try (IntStream result = mapper.apply(t)) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - if (result != null) - result.sequential().forEach(i -> downstream.accept(i)); + if (result != null) { + if (!cancellationRequestedCalled) { + result.sequential().forEach(downstreamAsInt); + } + else { + Spliterator.OfInt s = result.sequential().spliterator(); + do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt)); + } + } } } + + @Override + public boolean cancellationRequested() { + // If this method is called then an operation within the stream + // pipeline is short-circuiting (see AbstractPipeline.copyInto). + // Note that we cannot differentiate between an upstream or + // downstream operation + cancellationRequestedCalled = true; + return downstream.cancellationRequested(); + } }; } }; diff --git a/src/share/classes/java/util/stream/LongPipeline.java b/src/share/classes/java/util/stream/LongPipeline.java index fab01a21118af3718fb1302d71d247788fde2d65..02318175aff2443f25bf381638a8b193e63bc1ca 100644 --- a/src/share/classes/java/util/stream/LongPipeline.java +++ b/src/share/classes/java/util/stream/LongPipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -277,6 +277,12 @@ abstract class LongPipeline @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedLong(sink) { + // true if cancellationRequested() has been called + boolean cancellationRequestedCalled; + + // cache the consumer to avoid creation on every accepted element + LongConsumer downstreamAsLong = downstream::accept; + @Override public void begin(long size) { downstream.begin(-1); @@ -285,11 +291,27 @@ abstract class LongPipeline @Override public void accept(long t) { try (LongStream result = mapper.apply(t)) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - if (result != null) - result.sequential().forEach(i -> downstream.accept(i)); + if (result != null) { + if (!cancellationRequestedCalled) { + result.sequential().forEach(downstreamAsLong); + } + else { + Spliterator.OfLong s = result.sequential().spliterator(); + do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong)); + } + } } } + + @Override + public boolean cancellationRequested() { + // If this method is called then an operation within the stream + // pipeline is short-circuiting (see AbstractPipeline.copyInto). + // Note that we cannot differentiate between an upstream or + // downstream operation + cancellationRequestedCalled = true; + return downstream.cancellationRequested(); + } }; } }; diff --git a/src/share/classes/java/util/stream/ReferencePipeline.java b/src/share/classes/java/util/stream/ReferencePipeline.java index 8f5da0e55e7b98113a3cc0683434dcb47a55d4fd..abb435466bf2fd7ea1a294b907a02f0c4c817b29 100644 --- a/src/share/classes/java/util/stream/ReferencePipeline.java +++ b/src/share/classes/java/util/stream/ReferencePipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -251,12 +251,14 @@ abstract class ReferencePipeline @Override public final Stream flatMap(Function> mapper) { Objects.requireNonNull(mapper); - // We can do better than this, by polling cancellationRequested when stream is infinite return new StatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference(sink) { + // true if cancellationRequested() has been called + boolean cancellationRequestedCalled; + @Override public void begin(long size) { downstream.begin(-1); @@ -265,11 +267,27 @@ abstract class ReferencePipeline @Override public void accept(P_OUT u) { try (Stream result = mapper.apply(u)) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - if (result != null) - result.sequential().forEach(downstream); + if (result != null) { + if (!cancellationRequestedCalled) { + result.sequential().forEach(downstream); + } + else { + Spliterator s = result.sequential().spliterator(); + do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream)); + } + } } } + + @Override + public boolean cancellationRequested() { + // If this method is called then an operation within the stream + // pipeline is short-circuiting (see AbstractPipeline.copyInto). + // Note that we cannot differentiate between an upstream or + // downstream operation + cancellationRequestedCalled = true; + return downstream.cancellationRequested(); + } }; } }; @@ -278,13 +296,17 @@ abstract class ReferencePipeline @Override public final IntStream flatMapToInt(Function mapper) { Objects.requireNonNull(mapper); - // We can do better than this, by polling cancellationRequested when stream is infinite return new IntPipeline.StatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference(sink) { + // true if cancellationRequested() has been called + boolean cancellationRequestedCalled; + + // cache the consumer to avoid creation on every accepted element IntConsumer downstreamAsInt = downstream::accept; + @Override public void begin(long size) { downstream.begin(-1); @@ -293,11 +315,23 @@ abstract class ReferencePipeline @Override public void accept(P_OUT u) { try (IntStream result = mapper.apply(u)) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - if (result != null) - result.sequential().forEach(downstreamAsInt); + if (result != null) { + if (!cancellationRequestedCalled) { + result.sequential().forEach(downstreamAsInt); + } + else { + Spliterator.OfInt s = result.sequential().spliterator(); + do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt)); + } + } } } + + @Override + public boolean cancellationRequested() { + cancellationRequestedCalled = true; + return downstream.cancellationRequested(); + } }; } }; @@ -306,13 +340,17 @@ abstract class ReferencePipeline @Override public final DoubleStream flatMapToDouble(Function mapper) { Objects.requireNonNull(mapper); - // We can do better than this, by polling cancellationRequested when stream is infinite return new DoublePipeline.StatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference(sink) { + // true if cancellationRequested() has been called + boolean cancellationRequestedCalled; + + // cache the consumer to avoid creation on every accepted element DoubleConsumer downstreamAsDouble = downstream::accept; + @Override public void begin(long size) { downstream.begin(-1); @@ -321,11 +359,23 @@ abstract class ReferencePipeline @Override public void accept(P_OUT u) { try (DoubleStream result = mapper.apply(u)) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - if (result != null) - result.sequential().forEach(downstreamAsDouble); + if (result != null) { + if (!cancellationRequestedCalled) { + result.sequential().forEach(downstreamAsDouble); + } + else { + Spliterator.OfDouble s = result.sequential().spliterator(); + do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); + } + } } } + + @Override + public boolean cancellationRequested() { + cancellationRequestedCalled = true; + return downstream.cancellationRequested(); + } }; } }; @@ -340,7 +390,12 @@ abstract class ReferencePipeline @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference(sink) { + // true if cancellationRequested() has been called + boolean cancellationRequestedCalled; + + // cache the consumer to avoid creation on every accepted element LongConsumer downstreamAsLong = downstream::accept; + @Override public void begin(long size) { downstream.begin(-1); @@ -349,11 +404,23 @@ abstract class ReferencePipeline @Override public void accept(P_OUT u) { try (LongStream result = mapper.apply(u)) { - // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it - if (result != null) - result.sequential().forEach(downstreamAsLong); + if (result != null) { + if (!cancellationRequestedCalled) { + result.sequential().forEach(downstreamAsLong); + } + else { + Spliterator.OfLong s = result.sequential().spliterator(); + do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong)); + } + } } } + + @Override + public boolean cancellationRequested() { + cancellationRequestedCalled = true; + return downstream.cancellationRequested(); + } }; } }; diff --git a/src/share/classes/java/util/stream/SortedOps.java b/src/share/classes/java/util/stream/SortedOps.java index 592b609cad57f60c582e64484ed90f2c10fc7371..31e806dd56ffb18f86a63a461bfcdf28ea0e4ff9 100644 --- a/src/share/classes/java/util/stream/SortedOps.java +++ b/src/share/classes/java/util/stream/SortedOps.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -304,7 +304,8 @@ final class SortedOps { private static abstract class AbstractRefSortingSink extends Sink.ChainedReference { protected final Comparator comparator; // @@@ could be a lazy final value, if/when support is added - protected boolean cancellationWasRequested; + // true if cancellationRequested() has been called + protected boolean cancellationRequestedCalled; AbstractRefSortingSink(Sink downstream, Comparator comparator) { super(downstream); @@ -319,7 +320,11 @@ final class SortedOps { */ @Override public final boolean cancellationRequested() { - cancellationWasRequested = true; + // If this method is called then an operation within the stream + // pipeline is short-circuiting (see AbstractPipeline.copyInto). + // Note that we cannot differentiate between an upstream or + // downstream operation + cancellationRequestedCalled = true; return false; } } @@ -347,7 +352,7 @@ final class SortedOps { public void end() { Arrays.sort(array, 0, offset, comparator); downstream.begin(offset); - if (!cancellationWasRequested) { + if (!cancellationRequestedCalled) { for (int i = 0; i < offset; i++) downstream.accept(array[i]); } @@ -386,7 +391,7 @@ final class SortedOps { public void end() { list.sort(comparator); downstream.begin(list.size()); - if (!cancellationWasRequested) { + if (!cancellationRequestedCalled) { list.forEach(downstream::accept); } else { @@ -409,7 +414,8 @@ final class SortedOps { * Abstract {@link Sink} for implementing sort on int streams. */ private static abstract class AbstractIntSortingSink extends Sink.ChainedInt { - protected boolean cancellationWasRequested; + // true if cancellationRequested() has been called + protected boolean cancellationRequestedCalled; AbstractIntSortingSink(Sink downstream) { super(downstream); @@ -417,7 +423,7 @@ final class SortedOps { @Override public final boolean cancellationRequested() { - cancellationWasRequested = true; + cancellationRequestedCalled = true; return false; } } @@ -444,7 +450,7 @@ final class SortedOps { public void end() { Arrays.sort(array, 0, offset); downstream.begin(offset); - if (!cancellationWasRequested) { + if (!cancellationRequestedCalled) { for (int i = 0; i < offset; i++) downstream.accept(array[i]); } @@ -484,7 +490,7 @@ final class SortedOps { int[] ints = b.asPrimitiveArray(); Arrays.sort(ints); downstream.begin(ints.length); - if (!cancellationWasRequested) { + if (!cancellationRequestedCalled) { for (int anInt : ints) downstream.accept(anInt); } @@ -507,7 +513,8 @@ final class SortedOps { * Abstract {@link Sink} for implementing sort on long streams. */ private static abstract class AbstractLongSortingSink extends Sink.ChainedLong { - protected boolean cancellationWasRequested; + // true if cancellationRequested() has been called + protected boolean cancellationRequestedCalled; AbstractLongSortingSink(Sink downstream) { super(downstream); @@ -515,7 +522,7 @@ final class SortedOps { @Override public final boolean cancellationRequested() { - cancellationWasRequested = true; + cancellationRequestedCalled = true; return false; } } @@ -542,7 +549,7 @@ final class SortedOps { public void end() { Arrays.sort(array, 0, offset); downstream.begin(offset); - if (!cancellationWasRequested) { + if (!cancellationRequestedCalled) { for (int i = 0; i < offset; i++) downstream.accept(array[i]); } @@ -582,7 +589,7 @@ final class SortedOps { long[] longs = b.asPrimitiveArray(); Arrays.sort(longs); downstream.begin(longs.length); - if (!cancellationWasRequested) { + if (!cancellationRequestedCalled) { for (long aLong : longs) downstream.accept(aLong); } @@ -605,7 +612,8 @@ final class SortedOps { * Abstract {@link Sink} for implementing sort on long streams. */ private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble { - protected boolean cancellationWasRequested; + // true if cancellationRequested() has been called + protected boolean cancellationRequestedCalled; AbstractDoubleSortingSink(Sink downstream) { super(downstream); @@ -613,7 +621,7 @@ final class SortedOps { @Override public final boolean cancellationRequested() { - cancellationWasRequested = true; + cancellationRequestedCalled = true; return false; } } @@ -640,7 +648,7 @@ final class SortedOps { public void end() { Arrays.sort(array, 0, offset); downstream.begin(offset); - if (!cancellationWasRequested) { + if (!cancellationRequestedCalled) { for (int i = 0; i < offset; i++) downstream.accept(array[i]); } @@ -680,7 +688,7 @@ final class SortedOps { double[] doubles = b.asPrimitiveArray(); Arrays.sort(doubles); downstream.begin(doubles.length); - if (!cancellationWasRequested) { + if (!cancellationRequestedCalled) { for (double aDouble : doubles) downstream.accept(aDouble); } diff --git a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java index 0abe67e4d4a5b04dd45f8228026ba22d596f965c..3074e07229c8a573e757eda984c5655b5b472560 100644 --- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,7 +24,7 @@ /* * @test * @summary flat-map operations - * @bug 8044047 8076458 + * @bug 8044047 8076458 8075939 */ package org.openjdk.tests.java.util.stream; @@ -36,6 +36,7 @@ import java.util.Collection; import java.util.Collections; import java.util.function.Function; import java.util.stream.*; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.stream.LambdaTestHelpers.*; import static java.util.stream.ThowableHelper.checkNPE; @@ -43,6 +44,7 @@ import static java.util.stream.ThowableHelper.checkNPE; @Test public class FlatMapOpTest extends OpTestCase { + @Test public void testNullMapper() { checkNPE(() -> Stream.of(1).flatMap(null)); checkNPE(() -> IntStream.of(1).flatMap(null)); @@ -53,6 +55,7 @@ public class FlatMapOpTest extends OpTestCase { static final Function> integerRangeMapper = e -> IntStream.range(0, e).boxed(); + @Test public void testFlatMap() { String[] stringsArray = {"hello", "there", "", "yada"}; Stream strings = Arrays.asList(stringsArray).stream(); @@ -85,11 +88,24 @@ public class FlatMapOpTest extends OpTestCase { exerciseOps(data, s -> s.flatMap((Integer e) -> IntStream.range(0, e).boxed().limit(10))); } + @Test + public void testOpsShortCircuit() { + AtomicInteger count = new AtomicInteger(); + Stream.of(0).flatMap(i -> IntStream.range(0, 100).boxed()). + peek(i -> count.incrementAndGet()). + limit(10).toArray(); + assertEquals(count.get(), 10); + } + // @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class) public void testIntOps(String name, TestData.OfInt data) { - Collection result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToInt(j -> j))); + Collection result = exerciseOps(data, s -> s.flatMap(IntStream::of)); + assertEquals(data.size(), result.size()); + assertContents(data, result); + + result = exerciseOps(data, s -> s.boxed().flatMapToInt(IntStream::of)); assertEquals(data.size(), result.size()); assertContents(data, result); @@ -101,13 +117,35 @@ public class FlatMapOpTest extends OpTestCase { public void testIntOpsX(String name, TestData.OfInt data) { exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, e))); exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, e).limit(10))); + + exerciseOps(data, s -> s.boxed().flatMapToInt(e -> IntStream.range(0, e))); + exerciseOps(data, s -> s.boxed().flatMapToInt(e -> IntStream.range(0, e).limit(10))); + } + + @Test + public void testIntOpsShortCircuit() { + AtomicInteger count = new AtomicInteger(); + IntStream.of(0).flatMap(i -> IntStream.range(0, 100)). + peek(i -> count.incrementAndGet()). + limit(10).toArray(); + assertEquals(count.get(), 10); + + count.set(0); + Stream.of(0).flatMapToInt(i -> IntStream.range(0, 100)). + peek(i -> count.incrementAndGet()). + limit(10).toArray(); + assertEquals(count.get(), 10); } // @Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class) public void testLongOps(String name, TestData.OfLong data) { - Collection result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToLong(j -> j))); + Collection result = exerciseOps(data, s -> s.flatMap(LongStream::of)); + assertEquals(data.size(), result.size()); + assertContents(data, result); + + result = exerciseOps(data, s -> s.boxed().flatMapToLong(LongStream::of)); assertEquals(data.size(), result.size()); assertContents(data, result); @@ -121,11 +159,30 @@ public class FlatMapOpTest extends OpTestCase { exerciseOps(data, s -> s.flatMap(e -> LongStream.range(0, e).limit(10))); } + @Test + public void testLongOpsShortCircuit() { + AtomicInteger count = new AtomicInteger(); + LongStream.of(0).flatMap(i -> LongStream.range(0, 100)). + peek(i -> count.incrementAndGet()). + limit(10).toArray(); + assertEquals(count.get(), 10); + + count.set(0); + Stream.of(0).flatMapToLong(i -> LongStream.range(0, 100)). + peek(i -> count.incrementAndGet()). + limit(10).toArray(); + assertEquals(count.get(), 10); + } + // @Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class) public void testDoubleOps(String name, TestData.OfDouble data) { - Collection result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToDouble(j -> j))); + Collection result = exerciseOps(data, s -> s.flatMap(DoubleStream::of)); + assertEquals(data.size(), result.size()); + assertContents(data, result); + + result = exerciseOps(data, s -> s.boxed().flatMapToDouble(DoubleStream::of)); assertEquals(data.size(), result.size()); assertContents(data, result); @@ -138,4 +195,19 @@ public class FlatMapOpTest extends OpTestCase { exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, (int) e).asDoubleStream())); exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, (int) e).limit(10).asDoubleStream())); } + + @Test + public void testDoubleOpsShortCircuit() { + AtomicInteger count = new AtomicInteger(); + DoubleStream.of(0).flatMap(i -> IntStream.range(0, 100).asDoubleStream()). + peek(i -> count.incrementAndGet()). + limit(10).toArray(); + assertEquals(count.get(), 10); + + count.set(0); + Stream.of(0).flatMapToDouble(i -> IntStream.range(0, 100).asDoubleStream()). + peek(i -> count.incrementAndGet()). + limit(10).toArray(); + assertEquals(count.get(), 10); + } }