4

Async Http Client library(Nettyを使用)を使用して、RESTful APIに非同期Http Getリクエストを送信しています。ノンブロッキングの動作を保持したいので、Http Getリクエストの結果としてCompletableFuture<T>のインスタンスを返します。したがって、RESTfulなAPIエンドポイントがJson配列を返すところでは、私はCompletableFuture<T[]>を返しています。CompletableFuture <Stream<T>>をブロックすることなく<T>に変換するにはどうすればいいですか

まだ、The Four Essential Effects In ProgrammingについてのErik Meijerの分類によると、私はStream<T>が非同期Http Get要求を行い、Json配列を返すJavaメソッドの結果に、より適していると考えます。この場合、Stream<T>Observable<T>と等価で、多くの値を返す非同期計算の結果はとなります。

ので、respが応答を保持していることを考えると、私は、次のようCompletableFuture<Stream<T>>を取得することができます:

CompletableFuture<T[]> resp = … 
return resp.thenApply(Arrays::stream); 

をしかし、私はに計算を待たずに、私はCompletableFuture<Stream<T>> respStream<T>に変換することができますどのように思っていました完了しました(つまり、私はget()呼び出しをブロックしたくありません)?

私は、次の式と同じ結果を持って好きですが、get()にブロックすることなくなります

return resp.thenApply(Arrays::stream).get(); 

答えて

4

あなたはちょうどこのように、Future<T>get()メソッドへの呼び出しを延期しますStream<T>を構築することができます。

CompletableFuture<T[]> resp = ... 
return Stream 
     .of(resp)        // Stream<CompletableFuture<T[]>> 
     .flatMap(f -> Arrays.stream(f.join())); // Stream<T> 

代わりget()の私がチェック例外を回避するためにjoin()を使用しています、使用方法を簡素化します。

+2

それはストリーム操作が 'CompletableFuture'の完了までブロックされるという事実は変わりません。それはちょうどそれを少し高い価格で遅らせます。 – Holger

+3

オフコース。しかし、たびに横断されます。 OPでは、著者は '[] T'eitherをトラバースしていません。 –

+2

確かに。私は制限を強調したかっただけです(私はOPがそれらを認識しているかどうか分かりません)。私はそれらについてより詳細に解答を加えました。 – Holger

3

非同期計算の結果が配列として渡される限り、ストリーム操作は配列がハンドオフされるまで要素の処理を開始できないため、ここでStream APIを使用することはできません。非同期ジョブの完全な完了。

非同期ジョブを書き換えて配列の個々の要素を公開しない限り、キューを介して、ストリームのターミナル操作が開始された時点まで同期を延期することができます。つまり、非同期ジョブの完了を待つ前に、中間操作をStreamに連鎖させることができます。チェーニングは高価な操作ではないので、ゲインは非常に小さくなります。

まだ実行したい場合は、Miguel Gamboa’s solutionStream.of(resp).flatMap(f -> Arrays.stream(f.join()))となります。簡潔です。残念ながら、それは、join操作を延期することの利点を上回るパフォーマンスの欠点を有する可能性がある。アレイのストリーミングは、アレイの予測可能な長さと均衡分割のサポートがスムーズに行われますが、ネストされたストリームはこれらの機能が欠けているだけでなく、current implementation even lacks short-circuiting processingです。

static <T> Stream<T> getStream(CompletableFuture<T[]> resp) { 
    return StreamSupport.stream(() -> Arrays.spliterator(resp.join()), 
     Spliterator.ORDERED|Spliterator.SIZED|Spliterator.SUBSIZED|Spliterator.IMMUTABLE, 
     false); 
} 

これは、までjoin操作を延期ストリームを作成します。代わりに、ストリームの作成を延期するためにflatMapを利用するので、より多くの繰延ストリームの作成が直接サポートされ、より深い1つのレベルを、行くことをお勧めします

ターミナル操作は開始されますが、配列ベースのストリームのパフォーマンス特性は引き続きあります。しかし、コードは明らかに複雑であり、非同期操作を提供している配列がまだ実行されている間に中間操作を連鎖させる可能性があるという利点があります。

関連する問題