処理されたデータの統計を収集し、結果の統計を具体化するakkaストリームフィルタフローを実装しようとしています。ステートフルフローマテリアライゼーション最終状態
class SFilter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("SFilter.in")
val out = Outlet[A]("SFilter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var positive: Long = 0
var negative: Long = 0
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) {
push(out, elem)
positive += 1
} else {
pull(in)
negative += 1
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
これまでのところは良いが、私は私のSFilter[A]
タイプFlow[A,A,(Long,Long)]
であるようになります。どのようにしてコンフルエンスの最後に(positive,negative)
を具体化できますか?
をたぶん、あなたはその答えを探していませんが、何をやるように見えることは基本的に 'fold'です。したがって、自分のステージを書くことなく、既存の 'fold 'コンビネータを再利用することができます。 – jrudolph
@jrudolphはい私はそれについては思いますが、私は不変のアキュムレータでfoldを使うことを好みます。私はこのアプローチを変更可能な(しかし安全な)バージョンと比較しています。 – paradigmatic