2015-09-10 3 views
20

これは本質的にHow to short-circuit reduce on Stream?と同じ質問です。しかし、この質問はブール値のストリームに焦点を当てており、その答えは他の型に対して一般化できず、操作を減らすことができないので、より一般的な質問をしたいと思います。ストリームのreduce()操作を短絡する方法はありますか?

還元操作のためにabsorbing elementに遭遇したとき、ストリームを短絡させて短絡させるにはどうすればよいですか?

典型的な数学的ケースは、乗算のために0になります。このStream

int product = IntStream.of(2, 3, 4, 5, 0, 7, 8) 
     .reduce(1, (a, b) -> a * b); 

かかわらず0に遭遇された後、製品が知られているという事実の最後の二つの要素(78)を消費します。

+1

これは非常に一般的だと思うのですが、すべての乗算に*条件付き*を加える価値はありますか?それを補うためには、その後の乗算がたくさん必要になります。そして、ループの乗算に関してホットスポットがあなたよりスマートだと分かるかもしれません... – Holger

+0

@Holgerこれは主に学問的な質問ですが、短絡の可能性がある他の少なくともいくつかの削減について考えることができます。 (bitwiseとvs 0、bitwiseまたはvs 0xffff、完全に埋められた部分データと部分データのマージ...) – bowmore

+2

@Holger、短絡削減が役立つ例:セットのストリームを交差させる結果は空)、EnumSetsのストリームの和集合(中間結果にすべての可能な値が含まれると取り消されます)、文字列のストリームに文字列長制限(必要に応じて省略記号を追加)を追加します。 –

答えて

6

残念ながら、Stream APIには、独自の短絡操作を作成する機能がありません。あまりにもきれいな解決策は、RuntimeExceptionを投げてキャッチすることです。ここでIntStreamの実装ですが、それは、他のストリームのタイプのために一般化することができます。

public static int reduceWithCancelEx(IntStream stream, int identity, 
         IntBinaryOperator combiner, IntPredicate cancelCondition) { 
    class CancelException extends RuntimeException { 
     private final int val; 

     CancelException(int val) { 
      this.val = val; 
     } 
    } 

    try { 
     return stream.reduce(identity, (a, b) -> { 
      int res = combiner.applyAsInt(a, b); 
      if(cancelCondition.test(res)) 
       throw new CancelException(res); 
      return res; 
     }); 
    } catch (CancelException e) { 
     return e.val; 
    } 
} 

使用例:

int product = reduceWithCancelEx(
     IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println), 
     1, (a, b) -> a * b, val -> val == 0); 
System.out.println("Result: "+product); 

出力:それは作品にもかかわらず、

2 
3 
4 
5 
0 
Result: 0 

注意並列ストリームでは、他の並列タスクが例外をスローするとすぐに終了することは保証されません。すでに開始されているサブタスクは完了するまで実行される可能性が高いため、予想以上に多くの要素を処理する可能性があります。

更新:はるかに長いが、より並列的な代替ソリューションです。これは、基本的なスプライテータに基づいています。このスプライテータは、基本要素をすべて累積した結果である最大でも1つの要素を返します。シーケンシャルモードで使用すると、すべての作業が単一のtryAdvanceコールで実行されます。分割すると、各パートは対応する単一の部分結果を生成し、コンバイナ関数を使用してストリームエンジンによって削減されます。ここには一般的なバージョンがありますが、原始的な特殊化も可能です。 Stream.reduce(identity, accumulator, combiner)Stream.reduce(identity, combiner)に似てい

final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>, 
     Consumer<T>, Cloneable { 
    private Spliterator<T> source; 
    private final BiFunction<A, ? super T, A> accumulator; 
    private final Predicate<A> cancelPredicate; 
    private final AtomicBoolean cancelled = new AtomicBoolean(); 
    private A acc; 

    CancellableReduceSpliterator(Spliterator<T> source, A identity, 
      BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) { 
     this.source = source; 
     this.acc = identity; 
     this.accumulator = accumulator; 
     this.cancelPredicate = cancelPredicate; 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super A> action) { 
     if (source == null || cancelled.get()) { 
      source = null; 
      return false; 
     } 
     while (!cancelled.get() && source.tryAdvance(this)) { 
      if (cancelPredicate.test(acc)) { 
       cancelled.set(true); 
       break; 
      } 
     } 
     source = null; 
     action.accept(acc); 
     return true; 
    } 

    @Override 
    public void forEachRemaining(Consumer<? super A> action) { 
     tryAdvance(action); 
    } 

    @Override 
    public Spliterator<A> trySplit() { 
     if(source == null || cancelled.get()) { 
      source = null; 
      return null; 
     } 
     Spliterator<T> prefix = source.trySplit(); 
     if (prefix == null) 
      return null; 
     try { 
      @SuppressWarnings("unchecked") 
      CancellableReduceSpliterator<T, A> result = 
       (CancellableReduceSpliterator<T, A>) this.clone(); 
      result.source = prefix; 
      return result; 
     } catch (CloneNotSupportedException e) { 
      throw new InternalError(); 
     } 
    } 

    @Override 
    public long estimateSize() { 
     // let's pretend we have the same number of elements 
     // as the source, so the pipeline engine parallelize it in the same way 
     return source == null ? 0 : source.estimateSize(); 
    } 

    @Override 
    public int characteristics() { 
     return source == null ? SIZED : source.characteristics() & ORDERED; 
    } 

    @Override 
    public void accept(T t) { 
     this.acc = accumulator.apply(this.acc, t); 
    } 
} 

方法が、cancelPredicateと:

public static <T, U> U reduceWithCancel(Stream<T> stream, U identity, 
     BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner, 
     Predicate<U> cancelPredicate) { 
    return StreamSupport 
      .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity, 
        accumulator, cancelPredicate), stream.isParallel()).reduce(combiner) 
      .orElse(identity); 
} 

public static <T> T reduceWithCancel(Stream<T> stream, T identity, 
     BinaryOperator<T> combiner, Predicate<T> cancelPredicate) { 
    return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate); 
} 

のは、両方のバージョンをテストし、実際に処理されているどのように多くの要素を数えてみましょう。 0を近づけてみましょう。例外のバージョン:

AtomicInteger count = new AtomicInteger(); 
int product = reduceWithCancelEx(
     IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0) 
       .parallel().peek(i -> count.incrementAndGet()), 1, 
     (a, b) -> a * b, x -> x == 0); 
System.out.println("product: " + product + "/count: " + count); 
Thread.sleep(1000); 
System.out.println("product: " + product + "/count: " + count); 

典型的な出力:唯一のいくつかの要素が処理されるとき、結果が返されながら

product: 0/count: 281721 
product: 0/count: 500001 

ので、タスクはバックグラウンドで作業し、カウンタがまだ増加している続けています。

AtomicInteger count = new AtomicInteger(); 
int product = reduceWithCancel(
     IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0) 
       .parallel().peek(i -> count.incrementAndGet()).boxed(), 
       1, (a, b) -> a * b, x -> x == 0); 
System.out.println("product: " + product + "/count: " + count); 
Thread.sleep(1000); 
System.out.println("product: " + product + "/count: " + count); 

典型的な出力:ここspliteratorバージョンです

product: 0/count: 281353 
product: 0/count: 281353 

結果が返されたときにすべてのタスクが実際に終了します。

+0

私は更新コードがうまくいくと思いますが、現時点ではコードの貼り付けに問題があります。 – bowmore

+0

@bowmore、どちらの問題?おそらくいくつかの輸入品はありませんか? –

+0

私のJDK 8は自宅でも最新ではないことが分かりました。最新のインストール時にタイプ推論の問題が解決しました。 – bowmore

3

一般的な短絡静的低減方法は、ストリームのスプライテータを使用して実装できます。それはあまり複雑ではないことが判明しました! spliteratorの使用は、より柔軟な方法でSteamを使用したい場合に、多くの時間を費やす方法と思われます。

public static <T> T reduceWithCancel(Stream<T> s, T acc, BinaryOperator<T> op, Predicate<? super T> cancelPred) { 
    BoxConsumer<T> box = new BoxConsumer<T>(); 
    Spliterator<T> splitr = s.spliterator(); 

    while (!cancelPred.test(acc) && splitr.tryAdvance(box)) { 
     acc = op.apply(acc, box.value); 
    } 

    return acc; 
} 

public static class BoxConsumer<T> implements Consumer<T> { 
    T value = null; 
    public void accept(T t) { 
     value = t; 
    } 
} 

使用:

int product = reduceWithCancel(
     Stream.of(1, 2, 0, 3, 4).peek(System.out::println), 
     1, (acc, i) -> acc * i, i -> i == 0); 

    System.out.println("Result: " + product); 

出力:

1 
2 
0 
Result: 0 

方法は、端末の動作の他の種類を実行するように一般化することができます。

これはtake-while操作についてのthis answerに緩く基づいています。

私はこの並列化の可能性について何も知らない。

+1

私の解決策では、cancelPredicateはreduceの結果をテストした次の要素。この場合、実際にはより良いことです(例えば、Javaでは '65536 * 65536 == 0'、引数はどちらもゼロではありません)。あなたの答えは同じように簡単に適応することができます。私は並列に優しいスプライテータベースのアイデアを持っていますが、それを正しくコーディングする時間が必要です。 –

+1

@TagirValeev:これを並列で実行することは、スプライテータで面白い演習になります。あなたが何かを終えたら投稿してください! – Lii

+1

@ TagirValeev:* "私のソリューションでは、cancelPredicateが次の要素ではなく、削減の結果をテストしたことに注意してください。" *もう一度考えると、これも良いと思います。この場合だけではなく一般的に。この操作は、中断したい値になります。私は答えにそれを編集しました。それはまた私にラインを救った! – Lii

2

これは自分ではreduce()を使用しないで、既存の短絡の最終動作を使用することです。

副作用のある述語を使用する場合、noneMatch()またはallMatch()を使用してこれを使用できます。確かに最も清潔な解決策ではありませんが、目標を達成します。

AtomicInteger product = new AtomicInteger(1); 
IntStream.of(2, 3, 4, 5, 0, 7, 8) 
     .peek(System.out::println) 
     .noneMatch(i -> { 
      if (i == 0) { 
       product.set(0); 
       return true; 
      } 
      int oldValue = product.get(); 
      while (oldValue != 0 && !product.compareAndSet(oldValue, i * oldValue)) { 
       oldValue = product.get(); 
      } 
      return oldValue == 0; 
     }); 
System.out.println("Result: " + product.get()); 

短絡して並列にすることができます。

+0

面白い解決法ですが、それは可換コンバイナでのみ機能します。通常、ストリームAPIではコンバイナ機能のために可換性は必要ありません。それでもなおupvoted。私はこれをオブジェクトストリームとカスタムアイデンティティ/コンバイナ/述語メソッドに対して一般化しましたが、私のスプライテータソリューションよりも少し速いです。 –

+0

私のスプライテータの 'estimateSize()'に小さな誤りを修正しました( 'Long.MAX_VALUE'を返すことで、ストリームエンジンは必要以上に多くの並列タスクを生成しました)。現在、私のパラレルバージョンは一般化されたバージョンよりも速く動作しますが、シーケンシャルはまだあなたよりもいくぶん遅いです。 –

関連する問題