2016-07-18 6 views
2

私のストリームは、出力がList [Any]オブジェクトのFlowを持っています。 mapAsyncの後に、リストの代わりに個々の要素を処理するいくつかの段階があります。どうやってやるの?私はこれを行うにはどうすればよいAkka stream - List to map個々の要素のアサイナ

Flow[Any].mapAsyncUnordered(4) { listElement => 
    actorRef ? listElement 
}.someOtherStuff 

-

が効果的に私がで

Flow[Any].map { msg => 
    someListDerivedFrom(msg) 
} 

の出力が消費されるように接続したいですか?

答えて

4

あなたが探しているコンビネータはmapConcatだと思います。このコンビネータは入力引数をとり、Iterableという何かを返します。次のように簡単な例は次のようになります。

ここ
implicit val system = ActorSystem() 
implicit val mater = ActorMaterializer() 

val source = Source(List(List(1,2,3), List(4,5,6))) 
val sink = Sink.foreach[Int](println) 

val graph = 
    source. 
    mapConcat(identity). 
    to(sink) 
graph.run 

、私SourceList要素を吐き出している、と私Sinkは、これらのList秒に何があるかの基になる型を受け入れます。タイプが異なるので、それらを直接接続することはできません。しかし、それらの間にmapConcatを適用すると、それらのコンビネータはそれらの要素をフラット化して個々の要素(Int)を下流に送信するので、接続することができます。入力エレメントmapConcatは既にIterableになっていますので、mapConcatの本体にあるidentify関数を使用するだけで作業ができます。

+0

ありがとうございます。私はmapConcatを少し違って試していましたが、それはうまくいきませんでした。 「アイデンティティ」の意味は正確に何か – anindyaju99

+0

_identity_は_scala.Predef_で定義されていて、まるでそのように見えます。何かを自分自身に変換する関数です。 _map_、_flatMap_などで必要とされる関数として渡すのに非常に便利です。 – Phasmid