2017-08-22 18 views
0

処理されたデータの統計を収集し、結果の統計を具体化するakkaストリームフィルタフローを実装しようとしています。ステートフルフローマテリアライゼーション最終状態

class SFilter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] { 
    val in = Inlet[A]("SFilter.in") 
    val out = Outlet[A]("SFilter.out") 
    val shape = FlowShape.of(in, out) 
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     var positive: Long = 0 
     var negative: Long = 0 
     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val elem = grab(in) 
      if (p(elem)) { 
      push(out, elem) 
      positive += 1 
      } else { 
      pull(in) 
      negative += 1 
      } 
     } 
     }) 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
    } 
} 

これまでのところは良いが、私は私のSFilter[A]タイプFlow[A,A,(Long,Long)]であるようになります。どのようにしてコンフルエンスの最後に(positive,negative)を具体化できますか?

+0

をたぶん、あなたはその答えを探していませんが、何をやるように見えることは基本的に 'fold'です。したがって、自分のステージを書くことなく、既存の 'fold 'コンビネータを再利用することができます。 – jrudolph

+0

@jrudolphはい私はそれについては思いますが、私は不変のアキュムレータでfoldを使うことを好みます。私はこのアプローチを変更可能な(しかし安全な)バージョンと比較しています。 – paradigmatic

答えて

0

Tuple2[Long, Long]は実行中のストリーム自体に依存するため、Tuple2[Long, Long]をマテリアライズすることはできません。ただし、Future[Tuple2[Long, Long]]を実体化することができます。このストリームは、ストリームの完了時に完了します。

編集:通常のフィルターとSFilterを区別できるように、カスタムステージに異なる名前を付けたいとします。ヴィクトルクランの提案に

+0

'Future [Tuple2 [Long、Long]]'も良いでしょう。しかしどうですか?私はあなたの提案に従って名前を変えます。 – paradigmatic

+0

@paradigmaticドキュメントのこのセクションを読んでください:http://doc.akka.io/docs/akka/current/scala/stream/stream-customize.html#custom-materialized-values –

+0

@Viktor_Klangありがとう、私はできましたリンク – paradigmatic

0

おかげで、私は以下のソリューションを実装することができました:

class SFilter[A](p: A => Boolean) extends GraphStageWithMaterializedValue[FlowShape[A,A],Future[(Long,Long)]] { 
    val in = Inlet[A]("SFilter.in") 
    val out = Outlet[A]("SFilter.out") 
    val shape = FlowShape.of(in, out) 
    override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { 
    val result = Promise[(Long,Long)]() 
    val logic = new GraphStageLogic(shape) { 
     var positive: Long = 0 
     var negative: Long = 0 
     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val elem = grab(in) 
      if (p(elem)) { 
      push(out, elem) 
      positive += 1 
      } else { 
      pull(in) 
      negative += 1 
      } 
     } 
     override def onUpstreamFinish(): Unit = { 
      result.success((positive,negative)) 
      completeStage() 
     } 
     }) 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
     (logic, result.future) 
    } 
} 
関連する問題