2016-03-31 8 views
2

キーでグループ化されたファイルからデータストリームを処理しています。 KeyChanges [T、K]という名前のキーでストリームを分割するために使用できるapplyメソッドを持つクラスを作成しました。サブストリームの最初の項目が処理される前に、DBからデータを取得する必要があります。各サブストリームが完了したら、キューにメッセージを送信する必要があります。私はアッカストリームと似た何かをしたいと思いますakka-streamsのサブストリームの前後でアクションを実行しています。+ +

val groups: Map[Key, Seq[Value]] = stream.groupBy(v => v.k) 
val groupSummaryF = Future.sequence(groups.map { case (k, group) => 
    retrieveMyData(k).flatMap { data => 
    Future.sequence(group.map(v => process(data, v))).map(
     k -> _.foldLeft(0) { (a,t) => 
     t match { 
      case Success(v) => a + 1 
      case Failure(ex) => 
      println(s"failure: $ex") 
      a 
     } 
     } 
    ).andThen { 
     case Success((key,count)) => 
     sendMessage(count,key) 
    } 
    } 
}) 

:標準のScalaシーケンスでは、私はこのような何かをするだろう。データ検索では、データをキャッシュし、各要素の検索機能を呼び出すことができますが、キューメッセージについては、サブストリームがいつ完了するかを知る必要があります。これまで私はこれを回避する方法を見ていない。何か案は?

答えて

1

ストリームを実行して、シンクからアクションを実行することができます。

val categories = Array("DEBUG", "INFO", "WARN", "ERROR") 

// assume we have a stream from file which produces categoryId -> message 
val lines = (1 to 100).map(x => (Random.nextInt(categories.length), s"message $x")) 

def loadDataFromDatabase(categoryId: Int): Future[String] = 
    Future.successful(categories(categoryId)) 

// assume this emits message to the queue 
def emitToQueue(x: (String, Int)): Unit = 
    println(s"${x._2} messages from category ${x._1}") 

val flow = 
    Flow[(Int, String)]. 
    groupBy(4, _._1). 
    fold((0, List.empty[String])) { case ((_, acc), (catId, elem)) => 
     (catId, elem :: acc) 
    }. 
    mapAsync(1) { case (catId, messages) => 
     // here you load your stuff from the database 
     loadDataFromDatabase(catId).map(cat => (cat, messages)) 
    }. // here you may want to do some more processing 
    map(x => (x._1, x._2.size)). 
    mergeSubstreams 

// assume the source is a file 
Source.fromIterator(() => lines.iterator). 
via(flow). 
to(Sink.foreach(emitToQueue)).run() 

複数のファイルに対して実行し、合計を1回報告する場合は、そのようにすることができます。

val futures = (1 to 4).map { x => 
    Source.fromIterator(() => lines.iterator).via(flow).toMat(Sink.seq[(String, Int)])(Keep.right).run() 
} 
Future.sequence(futures).map { results => 
    results.flatten.groupBy(_._1).foreach { case (cat, xs) => 
    val total = xs.map(_._2).sum 
    println(s"$total messages from category $cat") 
    } 
} 

あなたがフローを実行したとき、あなたは将来を取得し、見ての通り。終了時には、マテリアライズされた値(フローの結果)が含まれ、必要に応じて処理できます。

+0

サブストリームをリストに折りたたんだ解決策がありましたが、サブストリームが非常に大きいとメモリ消費量が高くなる可能性があるようでした。私は実際のユースケースでは、mergeSubstreamsの後に行われる結果をさらに要約しているので、キューの排出を許可するために "alsoTo"を折り返しの後に使用できることも発見しました。 – AlphaGeek

+0

値を必要としない場合は、リストには折りたたむことができず、数値にすることができます。多分、あなたが必要とするものとデータのサイズがもっと正確であれば、もっと良い解決法を提案することができます。 – lpiepiora

関連する問題