提交 da485e25 编写于 作者: S shade

8075939: Stream.flatMap() causes breaking of short-circuiting of terminal operations

Reviewed-by: forax, smarks, andrew
上级 8844d081
/*
* Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -260,6 +260,12 @@ abstract class DoublePipeline<E_IN>
@Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble<Double>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
DoubleConsumer downstreamAsDouble = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
......@@ -268,11 +274,27 @@ abstract class DoublePipeline<E_IN>
@Override
public void accept(double t) {
try (DoubleStream result = mapper.apply(t)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsDouble);
}
else {
Spliterator.OfDouble s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
}
}
}
}
@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
......
/*
* Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -296,6 +296,12 @@ abstract class IntPipeline<E_IN>
@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt<Integer>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
IntConsumer downstreamAsInt = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
......@@ -304,11 +310,27 @@ abstract class IntPipeline<E_IN>
@Override
public void accept(int t) {
try (IntStream result = mapper.apply(t)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsInt);
}
else {
Spliterator.OfInt s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
}
}
}
}
@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
......
/*
* Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -277,6 +277,12 @@ abstract class LongPipeline<E_IN>
@Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong<Long>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
LongConsumer downstreamAsLong = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
......@@ -285,11 +291,27 @@ abstract class LongPipeline<E_IN>
@Override
public void accept(long t) {
try (LongStream result = mapper.apply(t)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(i -> downstream.accept(i));
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsLong);
}
else {
Spliterator.OfLong s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
}
}
}
}
@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
......
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -251,12 +251,14 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
@Override
public void begin(long size) {
downstream.begin(-1);
......@@ -265,11 +267,27 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstream);
}
else {
Spliterator<? extends R> s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
}
}
}
}
@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
......@@ -278,13 +296,17 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedReference<P_OUT, Integer>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
IntConsumer downstreamAsInt = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
......@@ -293,11 +315,23 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
try (IntStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsInt);
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsInt);
}
else {
Spliterator.OfInt s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
}
}
}
}
@Override
public boolean cancellationRequested() {
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
......@@ -306,13 +340,17 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedReference<P_OUT, Double>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
DoubleConsumer downstreamAsDouble = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
......@@ -321,11 +359,23 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
try (DoubleStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsDouble);
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsDouble);
}
else {
Spliterator.OfDouble s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
}
}
}
}
@Override
public boolean cancellationRequested() {
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
......@@ -340,7 +390,12 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedReference<P_OUT, Long>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
LongConsumer downstreamAsLong = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
......@@ -349,11 +404,23 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
try (LongStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsLong);
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsLong);
}
else {
Spliterator.OfLong s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
}
}
}
}
@Override
public boolean cancellationRequested() {
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
......
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -304,7 +304,8 @@ final class SortedOps {
private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
protected final Comparator<? super T> comparator;
// @@@ could be a lazy final value, if/when support is added
protected boolean cancellationWasRequested;
// true if cancellationRequested() has been called
protected boolean cancellationRequestedCalled;
AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
super(downstream);
......@@ -319,7 +320,11 @@ final class SortedOps {
*/
@Override
public final boolean cancellationRequested() {
cancellationWasRequested = true;
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return false;
}
}
......@@ -347,7 +352,7 @@ final class SortedOps {
public void end() {
Arrays.sort(array, 0, offset, comparator);
downstream.begin(offset);
if (!cancellationWasRequested) {
if (!cancellationRequestedCalled) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
......@@ -386,7 +391,7 @@ final class SortedOps {
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
if (!cancellationRequestedCalled) {
list.forEach(downstream::accept);
}
else {
......@@ -409,7 +414,8 @@ final class SortedOps {
* Abstract {@link Sink} for implementing sort on int streams.
*/
private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
protected boolean cancellationWasRequested;
// true if cancellationRequested() has been called
protected boolean cancellationRequestedCalled;
AbstractIntSortingSink(Sink<? super Integer> downstream) {
super(downstream);
......@@ -417,7 +423,7 @@ final class SortedOps {
@Override
public final boolean cancellationRequested() {
cancellationWasRequested = true;
cancellationRequestedCalled = true;
return false;
}
}
......@@ -444,7 +450,7 @@ final class SortedOps {
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
if (!cancellationWasRequested) {
if (!cancellationRequestedCalled) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
......@@ -484,7 +490,7 @@ final class SortedOps {
int[] ints = b.asPrimitiveArray();
Arrays.sort(ints);
downstream.begin(ints.length);
if (!cancellationWasRequested) {
if (!cancellationRequestedCalled) {
for (int anInt : ints)
downstream.accept(anInt);
}
......@@ -507,7 +513,8 @@ final class SortedOps {
* Abstract {@link Sink} for implementing sort on long streams.
*/
private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
protected boolean cancellationWasRequested;
// true if cancellationRequested() has been called
protected boolean cancellationRequestedCalled;
AbstractLongSortingSink(Sink<? super Long> downstream) {
super(downstream);
......@@ -515,7 +522,7 @@ final class SortedOps {
@Override
public final boolean cancellationRequested() {
cancellationWasRequested = true;
cancellationRequestedCalled = true;
return false;
}
}
......@@ -542,7 +549,7 @@ final class SortedOps {
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
if (!cancellationWasRequested) {
if (!cancellationRequestedCalled) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
......@@ -582,7 +589,7 @@ final class SortedOps {
long[] longs = b.asPrimitiveArray();
Arrays.sort(longs);
downstream.begin(longs.length);
if (!cancellationWasRequested) {
if (!cancellationRequestedCalled) {
for (long aLong : longs)
downstream.accept(aLong);
}
......@@ -605,7 +612,8 @@ final class SortedOps {
* Abstract {@link Sink} for implementing sort on long streams.
*/
private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
protected boolean cancellationWasRequested;
// true if cancellationRequested() has been called
protected boolean cancellationRequestedCalled;
AbstractDoubleSortingSink(Sink<? super Double> downstream) {
super(downstream);
......@@ -613,7 +621,7 @@ final class SortedOps {
@Override
public final boolean cancellationRequested() {
cancellationWasRequested = true;
cancellationRequestedCalled = true;
return false;
}
}
......@@ -640,7 +648,7 @@ final class SortedOps {
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
if (!cancellationWasRequested) {
if (!cancellationRequestedCalled) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
......@@ -680,7 +688,7 @@ final class SortedOps {
double[] doubles = b.asPrimitiveArray();
Arrays.sort(doubles);
downstream.begin(doubles.length);
if (!cancellationWasRequested) {
if (!cancellationRequestedCalled) {
for (double aDouble : doubles)
downstream.accept(aDouble);
}
......
/*
* Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -24,7 +24,7 @@
/*
* @test
* @summary flat-map operations
* @bug 8044047 8076458
* @bug 8044047 8076458 8075939
*/
package org.openjdk.tests.java.util.stream;
......@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import java.util.stream.*;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.stream.LambdaTestHelpers.*;
import static java.util.stream.ThowableHelper.checkNPE;
......@@ -43,6 +44,7 @@ import static java.util.stream.ThowableHelper.checkNPE;
@Test
public class FlatMapOpTest extends OpTestCase {
@Test
public void testNullMapper() {
checkNPE(() -> Stream.of(1).flatMap(null));
checkNPE(() -> IntStream.of(1).flatMap(null));
......@@ -53,6 +55,7 @@ public class FlatMapOpTest extends OpTestCase {
static final Function<Integer, Stream<Integer>> integerRangeMapper
= e -> IntStream.range(0, e).boxed();
@Test
public void testFlatMap() {
String[] stringsArray = {"hello", "there", "", "yada"};
Stream<String> strings = Arrays.asList(stringsArray).stream();
......@@ -85,11 +88,24 @@ public class FlatMapOpTest extends OpTestCase {
exerciseOps(data, s -> s.flatMap((Integer e) -> IntStream.range(0, e).boxed().limit(10)));
}
@Test
public void testOpsShortCircuit() {
AtomicInteger count = new AtomicInteger();
Stream.of(0).flatMap(i -> IntStream.range(0, 100).boxed()).
peek(i -> count.incrementAndGet()).
limit(10).toArray();
assertEquals(count.get(), 10);
}
//
@Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
public void testIntOps(String name, TestData.OfInt data) {
Collection<Integer> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToInt(j -> j)));
Collection<Integer> result = exerciseOps(data, s -> s.flatMap(IntStream::of));
assertEquals(data.size(), result.size());
assertContents(data, result);
result = exerciseOps(data, s -> s.boxed().flatMapToInt(IntStream::of));
assertEquals(data.size(), result.size());
assertContents(data, result);
......@@ -101,13 +117,35 @@ public class FlatMapOpTest extends OpTestCase {
public void testIntOpsX(String name, TestData.OfInt data) {
exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, e)));
exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, e).limit(10)));
exerciseOps(data, s -> s.boxed().flatMapToInt(e -> IntStream.range(0, e)));
exerciseOps(data, s -> s.boxed().flatMapToInt(e -> IntStream.range(0, e).limit(10)));
}
@Test
public void testIntOpsShortCircuit() {
AtomicInteger count = new AtomicInteger();
IntStream.of(0).flatMap(i -> IntStream.range(0, 100)).
peek(i -> count.incrementAndGet()).
limit(10).toArray();
assertEquals(count.get(), 10);
count.set(0);
Stream.of(0).flatMapToInt(i -> IntStream.range(0, 100)).
peek(i -> count.incrementAndGet()).
limit(10).toArray();
assertEquals(count.get(), 10);
}
//
@Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class)
public void testLongOps(String name, TestData.OfLong data) {
Collection<Long> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToLong(j -> j)));
Collection<Long> result = exerciseOps(data, s -> s.flatMap(LongStream::of));
assertEquals(data.size(), result.size());
assertContents(data, result);
result = exerciseOps(data, s -> s.boxed().flatMapToLong(LongStream::of));
assertEquals(data.size(), result.size());
assertContents(data, result);
......@@ -121,11 +159,30 @@ public class FlatMapOpTest extends OpTestCase {
exerciseOps(data, s -> s.flatMap(e -> LongStream.range(0, e).limit(10)));
}
@Test
public void testLongOpsShortCircuit() {
AtomicInteger count = new AtomicInteger();
LongStream.of(0).flatMap(i -> LongStream.range(0, 100)).
peek(i -> count.incrementAndGet()).
limit(10).toArray();
assertEquals(count.get(), 10);
count.set(0);
Stream.of(0).flatMapToLong(i -> LongStream.range(0, 100)).
peek(i -> count.incrementAndGet()).
limit(10).toArray();
assertEquals(count.get(), 10);
}
//
@Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class)
public void testDoubleOps(String name, TestData.OfDouble data) {
Collection<Double> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToDouble(j -> j)));
Collection<Double> result = exerciseOps(data, s -> s.flatMap(DoubleStream::of));
assertEquals(data.size(), result.size());
assertContents(data, result);
result = exerciseOps(data, s -> s.boxed().flatMapToDouble(DoubleStream::of));
assertEquals(data.size(), result.size());
assertContents(data, result);
......@@ -138,4 +195,19 @@ public class FlatMapOpTest extends OpTestCase {
exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, (int) e).asDoubleStream()));
exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, (int) e).limit(10).asDoubleStream()));
}
@Test
public void testDoubleOpsShortCircuit() {
AtomicInteger count = new AtomicInteger();
DoubleStream.of(0).flatMap(i -> IntStream.range(0, 100).asDoubleStream()).
peek(i -> count.incrementAndGet()).
limit(10).toArray();
assertEquals(count.get(), 10);
count.set(0);
Stream.of(0).flatMapToDouble(i -> IntStream.range(0, 100).asDoubleStream()).
peek(i -> count.incrementAndGet()).
limit(10).toArray();
assertEquals(count.get(), 10);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册