残念ながら、Stream APIには、独自の短絡操作を作成する機能がありません。あまりにもきれいな解決策は、RuntimeException
を投げてキャッチすることです。ここでIntStream
の実装ですが、それは、他のストリームのタイプのために一般化することができます。
public static int reduceWithCancelEx(IntStream stream, int identity,
IntBinaryOperator combiner, IntPredicate cancelCondition) {
class CancelException extends RuntimeException {
private final int val;
CancelException(int val) {
this.val = val;
}
}
try {
return stream.reduce(identity, (a, b) -> {
int res = combiner.applyAsInt(a, b);
if(cancelCondition.test(res))
throw new CancelException(res);
return res;
});
} catch (CancelException e) {
return e.val;
}
}
使用例:
int product = reduceWithCancelEx(
IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println),
1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);
出力:それは作品にもかかわらず、
2
3
4
5
0
Result: 0
注意並列ストリームでは、他の並列タスクが例外をスローするとすぐに終了することは保証されません。すでに開始されているサブタスクは完了するまで実行される可能性が高いため、予想以上に多くの要素を処理する可能性があります。
更新:はるかに長いが、より並列的な代替ソリューションです。これは、基本的なスプライテータに基づいています。このスプライテータは、基本要素をすべて累積した結果である最大でも1つの要素を返します。シーケンシャルモードで使用すると、すべての作業が単一のtryAdvance
コールで実行されます。分割すると、各パートは対応する単一の部分結果を生成し、コンバイナ関数を使用してストリームエンジンによって削減されます。ここには一般的なバージョンがありますが、原始的な特殊化も可能です。 Stream.reduce(identity, accumulator, combiner)
とStream.reduce(identity, combiner)
に似てい
final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
Consumer<T>, Cloneable {
private Spliterator<T> source;
private final BiFunction<A, ? super T, A> accumulator;
private final Predicate<A> cancelPredicate;
private final AtomicBoolean cancelled = new AtomicBoolean();
private A acc;
CancellableReduceSpliterator(Spliterator<T> source, A identity,
BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
this.source = source;
this.acc = identity;
this.accumulator = accumulator;
this.cancelPredicate = cancelPredicate;
}
@Override
public boolean tryAdvance(Consumer<? super A> action) {
if (source == null || cancelled.get()) {
source = null;
return false;
}
while (!cancelled.get() && source.tryAdvance(this)) {
if (cancelPredicate.test(acc)) {
cancelled.set(true);
break;
}
}
source = null;
action.accept(acc);
return true;
}
@Override
public void forEachRemaining(Consumer<? super A> action) {
tryAdvance(action);
}
@Override
public Spliterator<A> trySplit() {
if(source == null || cancelled.get()) {
source = null;
return null;
}
Spliterator<T> prefix = source.trySplit();
if (prefix == null)
return null;
try {
@SuppressWarnings("unchecked")
CancellableReduceSpliterator<T, A> result =
(CancellableReduceSpliterator<T, A>) this.clone();
result.source = prefix;
return result;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}
@Override
public long estimateSize() {
// let's pretend we have the same number of elements
// as the source, so the pipeline engine parallelize it in the same way
return source == null ? 0 : source.estimateSize();
}
@Override
public int characteristics() {
return source == null ? SIZED : source.characteristics() & ORDERED;
}
@Override
public void accept(T t) {
this.acc = accumulator.apply(this.acc, t);
}
}
方法が、cancelPredicate
と:
public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
Predicate<U> cancelPredicate) {
return StreamSupport
.stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
.orElse(identity);
}
public static <T> T reduceWithCancel(Stream<T> stream, T identity,
BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}
のは、両方のバージョンをテストし、実際に処理されているどのように多くの要素を数えてみましょう。 0
を近づけてみましょう。例外のバージョン:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()), 1,
(a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型的な出力:唯一のいくつかの要素が処理されるとき、結果が返されながら
product: 0/count: 281721
product: 0/count: 500001
ので、タスクはバックグラウンドで作業し、カウンタがまだ増加している続けています。
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()).boxed(),
1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型的な出力:ここspliteratorバージョンです
product: 0/count: 281353
product: 0/count: 281353
結果が返されたときにすべてのタスクが実際に終了します。
これは非常に一般的だと思うのですが、すべての乗算に*条件付き*を加える価値はありますか?それを補うためには、その後の乗算がたくさん必要になります。そして、ループの乗算に関してホットスポットがあなたよりスマートだと分かるかもしれません... – Holger
@Holgerこれは主に学問的な質問ですが、短絡の可能性がある他の少なくともいくつかの削減について考えることができます。 (bitwiseとvs 0、bitwiseまたはvs 0xffff、完全に埋められた部分データと部分データのマージ...) – bowmore
@Holger、短絡削減が役立つ例:セットのストリームを交差させる結果は空)、EnumSetsのストリームの和集合(中間結果にすべての可能な値が含まれると取り消されます)、文字列のストリームに文字列長制限(必要に応じて省略記号を追加)を追加します。 –