2017-10-19 19 views
5

のベクトル/ストリームデータの構成可能な戻り値は、ウィジェットを見つけたり、構築します、私はいくつかのクエリ基準に基づいて、APIを持って言う:非同期、Javaの9

Widget getMatchingWidget(WidgetCriteria c) throws Throwable 

(同期)クライアントコードのルックス以下のような:

try { 
    Widget w = getMatchingWidget(criteria); 
    processWidget(w); 
} catch (Throwable t) { 
    handleError(t); 
} 

今言う発見やウィジェットを構築することは予測できない高価であり、私はそれを待っている間に、クライアントがブロックしたくありません。

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c) 

クライアントはその後、書くことができ、次のいずれか:

CompletableFuture<Widget> f = getMatchingWidget(criteria); 
f.thenAccept(this::processWidget) 
f.exceptionally(t -> { handleError(t); return null; }) 

か:だから私はそれを変更する今

getMatchingWidget(criteria).whenComplete((t, w) -> { 
    if (t != null) { handleError(t); } 
    else { processWidget(t); } 
}); 

、代わりに同期APIは、n個のウィジェットへ0を返すことができるのは、言わせて:

Stream<Widget> getMatchingWidgets(WidgetCriteria c) 

単純に、私は書くことができます:

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c) 

しかし、これは実際にコードノンブロッキング、それだけで周りのブロックをプッシュことはありません - のいずれかFutureすべてブロックWidgetsが使用可能になるまで、またはそれぞれWidgetを待っているブロックStreamを反復するコード。私が欲しいのは、彼らが到着すると、私は各ウィジェットを処理できるようになるものですのようなもの:

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor) 

しかし、これはエラー処理を提供していない、と私は追加Consumer<Throwable> errorHandlerを追加した場合でも、それは私を聞かせていませんたとえば、私のウィジェット検索を他のクエリで構成したり、結果を変換することができます。

Stream(反復可能性、変換可能性)の特性とCompletableFutureの特性(非同期結果とエラー処理)を組み合わせた構成可能なものを探しています。 (そして、私たちがそれをしている間に、背圧がうまくいくかもしれません。)

これはjava.util.concurrent.Flow.Publisherですか? io.reactivex.Observable?もっと複雑なこと?何か簡単ですか?

+2

私はキューに物事をプッシュし、キューから物事を引き出す傾向があります。 –

答えて

6

あなたのユースケースは、RxJavaが対処している世界に非常に自然に入り込みます。我々が観察している場合:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

を観察可能なチェーンがbackgroundScheduler上で実行されます:

Observable<Widget> getMatchingWidgets(wc); 

基準に基づいて、ゼロ個以上のウィジェットを作成することを、それが使用して表示されたとして、あなたは、各ウィジェットを処理することができます多くの場合、スレッドプールエグゼキュータサービスのラッパーです。あなたのUIで各ウィジェットの最終処理を行う必要がある場合は、あなたが処理する前に、UIスケジューラに切り替えることobserveOn()演算子を使用することができます。

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .observeOn(uiScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

を私に、RxJavaアプローチのエレガンスはそれということです非常に多くのパイプライン管理のナットとボルトを流暢に扱います。そのオブザーバーチェーンを見ると、何が起こっているのか、そしてどこであるのか正確に分かります。

関連する問題