2016-12-10 12 views
0

私はCompletableFuturerunAfterBothで複数の先物をマージする機能を持っているが、2つ以上をマージしたいのであればどうなるでしょうか?複数の非同期CompletableFuturesの参加を処理する方法は?

CompletableFuture<Boolean> a = new CompletableFuture<>(); 
CompletableFuture<Boolean> b = new CompletableFuture<>(); 
CompletableFuture<Boolean> c = new CompletableFuture<>(); 

List<CompletableFuture<Boolean>> list = new LinkedList<>(); 

list.add(a); 
list.add(b); 
list.add(c); 

// Could be any number 
for (CompletableFuture<Boolean> f : list) { 
    f.runAfter.. 
} 

私の使用例は、メッセージを複数のソケットに送信して、いずれかのオブジェクトにあるかもしれない単一のオブジェクトを見つけることです。

私は現在、解決策として、これを探しています:

CompletableFuture<Boolean> a = new CompletableFuture<>(); 
CompletableFuture<Boolean> b = new CompletableFuture<>(); 
CompletableFuture<Boolean> c = new CompletableFuture<>(); 

List<CompletableFuture<Boolean>> list = new LinkedList<>(); 

list.add(a); 
list.add(b); 
list.add(c); 

CompletableFuture<Boolean> result = new CompletableFuture<>(); 

Thread accept = new Thread(() -> { 
    for (CompletableFuture<Boolean> f : list) 
     if (f.join() != null) 
     result.complete(f.join()); 
}); 

accept.start(); 

// Actual boolean value returned 
result.get(); 

しかし、それは混乱のようなものです。私の場合は、無効な結果を待つのではなく、有効な結果(nullではない)を取得するとすぐに処理を続けたいと思います。

たとえば、aは5秒かかり、bがすでに2秒で完了していてもループが待機しています。ループはそれがまだaを待っているのでそれを知らない。

正常終了時にすぐに対応できる複数の非同期先物を結合するパターンがありますか?

別の可能性:

public static class FutureUtil { 
public static <T> CompletableFuture<T> anyOfNot(
    Collection<CompletableFuture<T>> collection, 
    T value, 
    T defaultValue) 
{ 
    CompletableFuture<T> result = new CompletableFuture<>(); 

    new Thread(() -> { 
     for (CompletableFuture<T> f : collection) { 
     f.thenAccept((
      T r) -> { 
      if ((r != null && !r.equals(value)) 
       || (value != null && !value.equals(r))) 
       result.complete(r); 
     }); 
     } 

     try { 
     for (CompletableFuture<T> f : collection) 
      f.get(); 
     } 
     catch (Exception ex) { 
     result.completeExceptionally(ex); 
     } 

     result.complete(defaultValue); 
    }).start(); 

    return result; 
} 
} 

使用例:

CompletableFuture<Boolean> a = new CompletableFuture<>(); 
CompletableFuture<Boolean> b = new CompletableFuture<>(); 
CompletableFuture<Boolean> c = new CompletableFuture<>(); 

List<CompletableFuture<Boolean>> list = new LinkedList<>(); 

list.add(a); 
list.add(b); 
list.add(c); 

CompletableFuture<Boolean> result = FutureUtil.anyOfNot(list, null, false); 

result.get(); 
+0

を探しています:もっと複雑なものが必要#allOf-java.util.concurrent.CompletableFuture ...-)? – teppic

+0

並べ替えそのうちの1つが既に有効な結果(nullではない)で完了している場合、他の先物については待っていません。 – Zhro

+0

多分['anyOf'](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#anyOf-java.util.concurrent.CompletableFuture...- )? – Marco13

答えて

1

は、少なくとも1つのリストのCFSはnull以外の値で終了しますならばということ、あなたはこれを試すことができます知っている場合:

public static <T> CompletableFuture<T> firstNonNull(List<CompletableFuture<T>> completableFutures) { 

    final CompletableFuture<T> completableFutureResult = new CompletableFuture<>(); 
    completableFutures.forEach(cf -> cf.thenAccept(v -> { 
     if (v != null) { 
      completableFutureResult.complete(v); 
     } 
    })); 
    return completableFutureResult; 
} 

少なくとも1つのCFがnull以外の値を返すという保証がない場合、あなたは[CompletableFuture.allOf()](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.htmlため

public static <T> CompletableFuture<T> firstNonNull(List<CompletableFuture<T>> completableFutures, T defaultValue) { 

    final CompletableFuture<T> completableFutureResult = new CompletableFuture<>(); 
    completableFutures.forEach(cf -> cf.thenAccept(v -> { 
     if (v != null) { 
      completableFutureResult.complete(v); 
     } 
    })); 
    //handling the situation where all the CFs returned null 
    CompletableFuture<Void> allCompleted = CompletableFuture 
     .allOf((CompletableFuture<?>[]) completableFutures.toArray()); 
    allCompleted.thenRun(() -> { 
     //checking first if any of the completed delivered a non-null value, to avoid race conditions with the block above 
     completableFutures.forEach(cf -> { 
      final T result = cf.join(); 
      if (result != null) { 
       completableFutureResult.complete(result); 
      } 
     }); 
     //if still not completed, completing with default value 
     if (!completableFutureResult.isDone()) { 
      completableFutureResult.complete(defaultValue); 
     } 
    }); 
    return completableFutureResult; 
} 
+0

ありがとうございました。しかし、これは私の質問の終わりのコードサンプルとほぼ同じですが、例外処理とデフォルト値はありません。既に完了している未来に 'isDone()'をチェックする必要はないことに注意してください。 – Zhro

+0

あなたのソリューションでは、不要な 'Thread'を作成し、それをブロックしています。同じ作業をするために余分な糸が必要ないことを私は示しました。 _既に完了している未来についてisDone()を確認する必要はありません:うーん、その段階で完了したかどうかはわかりません。 'allAf'ブロックが' thenAccept'の前に実行されている可能性があります。 あなたは例外処理を正しく行っていますが、それはあなたの質問の一部ではありませんでした。 – Ruben

+0

コードを実行しようとすると次の例外が発生します。あなたは確認できますか? 'java.lang.ClassCastException:[Ljava.lang.Object; [Ljava.util.concurrent]にキャストすることはできません。CompletableFuture' – Zhro

関連する問題