2016-02-02 2 views
14

誰でも地図とmapAsync w.r.t AKKAストリームの違いを教えていただけますか? In the documentationマップとmapAsyncの違い

外部の非ストリーム ベースのサービスを含むストリームの変換や副作用がmapAsyncを行うことができるかmapAsyncUnordered

なぜカント我々は単に私たちがここにマッピングすると言われて?フロー、ソース、シンクはすべて本質的にモナドであると仮定し、マップはこれらの性質上の遅延をうまく機能するはずですか?

答えて

31

署名

差が最良signaturesで強調表示されている:Flow.mapFlow.mapAsyncタイプFuture[T]を返す関数を取り込みながらタイプTを返す関数を取り込みます。

実践例例として

、我々は、ユーザIDに基づいて、ユーザのフルネームのためのデータベース照会機能を持っていることとしますSourceUserID指定された値

type UserID = String 
type FullName = String 

val databaseLookup : UserID => FullName = ??? //implementation unimportant 

をStream内で単にFlow.mapを使用してデータベースに問い合わせ、フルネームをコンソールに出力することができます:

val userIDSource : Source[UserID, _] = ??? 

val stream = 
    userIDSource.via(Flow[UserID].map(databaseLookup)) 
       .to(Sink.foreach[FullName](println)) 
       .run() 

この実装の1つの制限は、このストリームが一度に1 dbクエリのみを作成することです。これは「ボトルネック」になり、ストリーム内の最大スループットを妨げる可能性があります。パフォーマンスを向上させるために、我々は単にFutureの内側databaseLookupをラップすることにより、並行処理を追加することができます。

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
    Future { databaseLookup(userID) } 

val concurrentStream = 
    userIDSource.via(Flow[UserID].map(concurrentDBLookup)) 
       .to(Sink.foreach[Future[FullName]](_ foreach println)) 
       .run() 

この単純な同時実行の補遺に問題は、我々は効果的に背圧を排除しているということです。シンクは未来を引っ張っているだけで、foreach printlnを追加しているため、ストリームはデータベースクエリーに比べて比較的高速ですが、ストリームは継続的にソースに需要を伝播し、より多くの先物を生成します。これは、同時に実行されているdatabaseLookupの数に制限がないことを意味します。

Flow.mapAsyncレスキュー;同時に同時ルックアップの回数をキャッピングしながら、我々は、同時デシベルルックアップを持つことができます。

val maxLookupCount = 10 

val maxLookupConcurrentStream = 
    userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup)) 
       .to(Sink.foreach[FullName](println)) 
       .run() 

Sink.foreachは単純だ、それはもはやFuture[FullName]になりますが、代わりにちょうどFullNameことがわかりません。順序なし非同期地図

あなたがFlow.mapAsyncUnorderedを使用することができますFullNamesにユーザーIDの順序を維持することを気にしない場合は

。あなたが気にしていたのは、すべてのフルネームを印刷していても、コンソールに到着した順番は気にしなかった場合に便利です。

+0

'mapAsync'は特定のステージに非同期境界を適用するのと似ていますか?ドキュメンテーションによると、非同期境界をマークすることでアクター内の各ステージが実行されます。 – jarvis11

関連する問題