2016-06-18 8 views
12

各行を処理するために、akkaストリームを使用して複数の大きなファイルを読み込みたい。各キーが( "識別子" - "値")で構成されているとします。 新しい "識別子"が見つかった場合は、データベースに保存して "値"を保存します。そうでない場合は、行のストリームを処理中に識別子がすでに見つかっている場合は、 "値"だけを保存します。そのためには、すでにマップ内に見つかっている識別子を保持するために、ある種の再帰的なステートフルフローが必要だと思います。私はこの流れで(newLine、contextWithIdentifiers)のペアを受け取ると思います。Akka Streams。ストリーム内のステートフルな状態

私はちょうどakkaストリームを調べ始めました。私はステートレスな処理を行うために自分自身を管理できると思いますが、 "contextWithIdentifiers"を保持する方法についての手がかりはありません。誰かが私に良い方向を向けることができないなら、私は感謝します。

私はScalaを使用しています。

+2

このようなご質問をいただきありがとうございます。このような単純な要求ですが、サンプルコードを使って意味のある答えを見つけ出すのは精巧です。これが私が見つけた唯一のものです! – akauppi

答えて

17

多分、statefulMapConcatのようなものがお手伝いできます。

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import scala.util.Random._ 
import scala.math.abs 
import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

//encapsulating your input 
case class IdentValue(id: Int, value: String) 
//some random generated input 
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere")) 

val stateFlow = Flow[IdentValue].statefulMapConcat{() => 
    //state with already processed ids 
    var ids = Set.empty[Int] 
    identValue => if (ids.contains(identValue.id)) { 
    //save value to DB 
    println(identValue.value) 
    List(identValue) 
    } else { 
    //save both to database 
    println(identValue) 
    ids = ids + identValue.id 
    List(identValue) 
    } 
} 

Source(identValues) 
    .via(stateFlow) 
    .runWith(Sink.seq) 
    .onSuccess { case identValue => println(identValue) } 
+0

コードをありがとう。真ん中にもう少しタイプがあると分かります。工場が関わっているからです。なぜ '.statefulMap'メソッドがないのか知っていますか? – akauppi

関連する問題