誰でも地図とmapAsync w.r.t AKKAストリームの違いを教えていただけますか? In the documentationマップとmapAsyncの違い
外部の非ストリーム ベースのサービスを含むストリームの変換や副作用がmapAsyncを行うことができるかmapAsyncUnordered
なぜカント我々は単に私たちがここにマッピングすると言われて?フロー、ソース、シンクはすべて本質的にモナドであると仮定し、マップはこれらの性質上の遅延をうまく機能するはずですか?
誰でも地図とmapAsync w.r.t AKKAストリームの違いを教えていただけますか? In the documentationマップとmapAsyncの違い
外部の非ストリーム ベースのサービスを含むストリームの変換や副作用がmapAsyncを行うことができるかmapAsyncUnordered
なぜカント我々は単に私たちがここにマッピングすると言われて?フロー、ソース、シンクはすべて本質的にモナドであると仮定し、マップはこれらの性質上の遅延をうまく機能するはずですか?
署名
差が最良signaturesで強調表示されている:Flow.map
はFlow.mapAsync
タイプFuture[T]
を返す関数を取り込みながらタイプT
を返す関数を取り込みます。
実践例例として
、我々は、ユーザIDに基づいて、ユーザのフルネームのためのデータベース照会機能を持っていることとしますSource
UserID
指定された値
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
をStream内で単にFlow.map
を使用してデータベースに問い合わせ、フルネームをコンソールに出力することができます:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
この実装の1つの制限は、このストリームが一度に1 dbクエリのみを作成することです。これは「ボトルネック」になり、ストリーム内の最大スループットを妨げる可能性があります。パフォーマンスを向上させるために、我々は単にFuture
の内側databaseLookup
をラップすることにより、並行処理を追加することができます。
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
この単純な同時実行の補遺に問題は、我々は効果的に背圧を排除しているということです。シンクは未来を引っ張っているだけで、foreach println
を追加しているため、ストリームはデータベースクエリーに比べて比較的高速ですが、ストリームは継続的にソースに需要を伝播し、より多くの先物を生成します。これは、同時に実行されているdatabaseLookup
の数に制限がないことを意味します。
Flow.mapAsync
レスキュー;同時に同時ルックアップの回数をキャッピングしながら、我々は、同時デシベルルックアップを持つことができます。
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
もSink.foreach
は単純だ、それはもはやFuture[FullName]
になりますが、代わりにちょうどFullName
ことがわかりません。順序なし非同期地図
あなたがFlow.mapAsyncUnordered
を使用することができますFullNamesにユーザーIDの順序を維持することを気にしない場合は
。あなたが気にしていたのは、すべてのフルネームを印刷していても、コンソールに到着した順番は気にしなかった場合に便利です。
'mapAsync'は特定のステージに非同期境界を適用するのと似ていますか?ドキュメンテーションによると、非同期境界をマークすることでアクター内の各ステージが実行されます。 – jarvis11