2017-03-25 9 views
1

GraphStageを書く必要がありますが、いくつかの問題がありました。 私は以下のコードを凝縮して、皆さんが私のためにそれを見てくれることを願っています。Akka StreamsのGraphStageに関する問題

以下のサンプルコードは私の実際の使用例ではありません。私の主張を示すためのものです。うまくいけば、それは私がアクアストリームについて理解していないものであり、それは限界ではない。

サンプルコードでは、WrapFlowShapeを使用してグラフを作成し、基本的にグラフの「in」を添付フローに、グラフの「out」をフロー外にリダイレクトします。

import akka.actor.ActorSystem 
import akka.stream._ 
import akka.stream.javadsl.RunnableGraph 
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source} 
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} 

import scala.collection.immutable 
import scala.io.StdIn 

object WrapFlowSandbox extends App { 
    case class WrapFlowShape[I, O](
     in: Inlet[I], 
     out: Outlet[O], 
     flowIn: Inlet[O], 
     flowOut: Outlet[I]) extends Shape { 
    val inlets: immutable.Seq[Inlet[_]] = in :: flowIn :: Nil 
    val outlets: immutable.Seq[Outlet[_]] = out :: flowOut :: Nil 
    def deepCopy = WrapFlowShape(in.carbonCopy, out.carbonCopy, flowIn.carbonCopy, flowOut.carbonCopy) 
    } 
    class WrapFlow[I, O] extends GraphStage[WrapFlowShape[I, O]] { 
    val in: Inlet[I] = Inlet[I]("WrapFlow.in") 
    val out: Outlet[O] = Outlet[O]("WrapFlow.out") 
    val flowIn: Inlet[O] = Inlet[O](s"Select.flowIn") 
    val flowOut: Outlet[I] = Outlet[I](s"Select.flowOut") 
    val shape: WrapFlowShape[I, O] = WrapFlowShape(in, out, flowIn, flowOut) 
    def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 
     var inElem: I = _ 
     setHandler(in, new InHandler { 
     def onPush = { 
      println("2 in.onPush") 
      inElem = grab(in) 
      pull(flowIn) 
     } 
     }) 
     setHandler(out, new OutHandler { 
     def onPull = { 
      println("1 out.onPull") 
      pull(in) 
     } 
     }) 
     setHandler(flowIn, new InHandler { 
     def onPush = { 
      println("4 flowIn.onPush") 
      val outElem = grab(flowIn) 
      push(out, outElem) 
     } 
     }) 
     setHandler(flowOut, new OutHandler { 
     def onPull = { 
      println("3 flowOut.onPull") 
      push(flowOut, inElem) 
     } 
     }) 
    } 
    } 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    val flow = Flow[Int].map(_ + 1) 
    RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val select = b.add(new WrapFlow[Int, Int]) 
    Source.single(1) ~> select.in 
    select.out ~> Sink.foreach[Int](r => println(s"result = $r")) 
    select.flowOut ~> flow ~> select.flowIn 
    ClosedShape 
    }).run(materializer) 
    StdIn.readLine 
    system.terminate 
} 

私が見ることが期待出力である:

1 out.onPull 
2 in.onPush 
3 flowOut.onPull 
4 flowIn.onPush 
result = 2 

実際の出力がちょうど最初の3行である。

1 out.onPull 
2 in.onPush 
3 flowOut.onPull 

InHandler.onPush() "flowIn" のためのものです決して呼び出されません。

このようにGraphStageを書くのは非凡なことですが、私はそれを必要としています。私を困惑何

は私がステップ2(プル(flowIn))、 と順番に付着流にそれを引っ張ることにより、付着流の需要を生成し、ステップ3

で「流出」の需要が発生したということです

ただし、手順3でflowOutを使用して要素をプッシュした後は、要素はプッシュされないため、手順4は実行されませんでした。

なぜですか?

添付のフローが下流の需要を感知し、ステップ3で上流に需要を生成する場合、ステップ3でプッシュされた要素が添付ストリームに通過しないのはなぜですか?

+0

Btw。あなたのシェイプはすでにakkaで 'BidiShape'として定義されています。答えのために – jrudolph

答えて

1

私はあなたのハンドラのロジックに従っているかわかりません。私はあなたのGraphDSL.create()内容から理解した内容に基づいて、以下にそれらを改訂:それは次の出力を生成する必要があり実行

def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 
    var inElem: I = _ 
    setHandler(in, new InHandler { 
    def onPush = { 
     println("in.onPush") 
     inElem = grab(in) 
     push(flowOut, inElem) 
    } 
    }) 
    setHandler(out, new OutHandler { 
    def onPull = { 
     println("out.onPull") 
     pull(flowIn) 
    } 
    }) 
    setHandler(flowIn, new InHandler { 
    def onPush = { 
     println("flowIn.onPush") 
     val outElem = grab(flowIn) 
     push(out, outElem) 
    } 
    }) 
    setHandler(flowOut, new OutHandler { 
    def onPull = { 
     println("flowOut.onPull") 
     pull(in) 
    } 
    }) 
} 

out.onPull 
flowOut.onPull 
in.onPush 
flowIn.onPush 
result = 2 

は方法copyFromPorts()があなたのWrapFlowShape場合には上書きされていないことに気づきましたクラス(抽象クラ​​スではない)私はあなたが次のようなものでそれを上書きする必要があると思う:

+0

ありがとう。それは動作しますが、私のユースケースはそれより少し複雑です。私はそれを単純化しましたが、実際には複数のflowInsとflowOutsがあります。私はinElemを取得する必要があります前に、私はどのflowInsを引き出すかを知っている。だから私はそれらのすべてを外に引き出すことはできない。 –

+0

btw、Shapeは抽象クラスでdeepCopyも抽象クラスなので、オーバーライドは必要ありません。 –

関連する問題