キーでグループ化されたファイルからデータストリームを処理しています。 KeyChanges [T、K]という名前のキーでストリームを分割するために使用できるapplyメソッドを持つクラスを作成しました。サブストリームの最初の項目が処理される前に、DBからデータを取得する必要があります。各サブストリームが完了したら、キューにメッセージを送信する必要があります。私はアッカストリームと似た何かをしたいと思いますakka-streamsのサブストリームの前後でアクションを実行しています。+ +
val groups: Map[Key, Seq[Value]] = stream.groupBy(v => v.k)
val groupSummaryF = Future.sequence(groups.map { case (k, group) =>
retrieveMyData(k).flatMap { data =>
Future.sequence(group.map(v => process(data, v))).map(
k -> _.foldLeft(0) { (a,t) =>
t match {
case Success(v) => a + 1
case Failure(ex) =>
println(s"failure: $ex")
a
}
}
).andThen {
case Success((key,count)) =>
sendMessage(count,key)
}
}
})
:標準のScalaシーケンスでは、私はこのような何かをするだろう。データ検索では、データをキャッシュし、各要素の検索機能を呼び出すことができますが、キューメッセージについては、サブストリームがいつ完了するかを知る必要があります。これまで私はこれを回避する方法を見ていない。何か案は?
サブストリームをリストに折りたたんだ解決策がありましたが、サブストリームが非常に大きいとメモリ消費量が高くなる可能性があるようでした。私は実際のユースケースでは、mergeSubstreamsの後に行われる結果をさらに要約しているので、キューの排出を許可するために "alsoTo"を折り返しの後に使用できることも発見しました。 – AlphaGeek
値を必要としない場合は、リストには折りたたむことができず、数値にすることができます。多分、あなたが必要とするものとデータのサイズがもっと正確であれば、もっと良い解決法を提案することができます。 – lpiepiora