のベクトル/ストリームデータの構成可能な戻り値は、ウィジェットを見つけたり、構築します、私はいくつかのクエリ基準に基づいて、APIを持って言う:非同期、Javaの9
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
(同期)クライアントコードのルックス以下のような:
try {
Widget w = getMatchingWidget(criteria);
processWidget(w);
} catch (Throwable t) {
handleError(t);
}
今言う発見やウィジェットを構築することは予測できない高価であり、私はそれを待っている間に、クライアントがブロックしたくありません。
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
クライアントはその後、書くことができ、次のいずれか:
CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })
か:だから私はそれを変更する今
getMatchingWidget(criteria).whenComplete((t, w) -> {
if (t != null) { handleError(t); }
else { processWidget(t); }
});
、代わりに同期APIは、n個のウィジェットへ0を返すことができるのは、言わせて:
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
単純に、私は書くことができます:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
しかし、これは実際にコードノンブロッキング、それだけで周りのブロックをプッシュことはありません - のいずれかFuture
すべてブロックWidgets
が使用可能になるまで、またはそれぞれWidget
を待っているブロックStream
を反復するコード。私が欲しいのは、彼らが到着すると、私は各ウィジェットを処理できるようになるものですのようなもの:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
しかし、これはエラー処理を提供していない、と私は追加Consumer<Throwable> errorHandler
を追加した場合でも、それは私を聞かせていませんたとえば、私のウィジェット検索を他のクエリで構成したり、結果を変換することができます。
Stream
(反復可能性、変換可能性)の特性とCompletableFuture
の特性(非同期結果とエラー処理)を組み合わせた構成可能なものを探しています。 (そして、私たちがそれをしている間に、背圧がうまくいくかもしれません。)
これはjava.util.concurrent.Flow.Publisherですか? io.reactivex.Observable?もっと複雑なこと?何か簡単ですか?
私はキューに物事をプッシュし、キューから物事を引き出す傾向があります。 –