GraphStageを書く必要がありますが、いくつかの問題がありました。 私は以下のコードを凝縮して、皆さんが私のためにそれを見てくれることを願っています。Akka StreamsのGraphStageに関する問題
以下のサンプルコードは私の実際の使用例ではありません。私の主張を示すためのものです。うまくいけば、それは私がアクアストリームについて理解していないものであり、それは限界ではない。
サンプルコードでは、WrapFlowShapeを使用してグラフを作成し、基本的にグラフの「in」を添付フローに、グラフの「out」をフロー外にリダイレクトします。
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.immutable
import scala.io.StdIn
object WrapFlowSandbox extends App {
case class WrapFlowShape[I, O](
in: Inlet[I],
out: Outlet[O],
flowIn: Inlet[O],
flowOut: Outlet[I]) extends Shape {
val inlets: immutable.Seq[Inlet[_]] = in :: flowIn :: Nil
val outlets: immutable.Seq[Outlet[_]] = out :: flowOut :: Nil
def deepCopy = WrapFlowShape(in.carbonCopy, out.carbonCopy, flowIn.carbonCopy, flowOut.carbonCopy)
}
class WrapFlow[I, O] extends GraphStage[WrapFlowShape[I, O]] {
val in: Inlet[I] = Inlet[I]("WrapFlow.in")
val out: Outlet[O] = Outlet[O]("WrapFlow.out")
val flowIn: Inlet[O] = Inlet[O](s"Select.flowIn")
val flowOut: Outlet[I] = Outlet[I](s"Select.flowOut")
val shape: WrapFlowShape[I, O] = WrapFlowShape(in, out, flowIn, flowOut)
def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var inElem: I = _
setHandler(in, new InHandler {
def onPush = {
println("2 in.onPush")
inElem = grab(in)
pull(flowIn)
}
})
setHandler(out, new OutHandler {
def onPull = {
println("1 out.onPull")
pull(in)
}
})
setHandler(flowIn, new InHandler {
def onPush = {
println("4 flowIn.onPush")
val outElem = grab(flowIn)
push(out, outElem)
}
})
setHandler(flowOut, new OutHandler {
def onPull = {
println("3 flowOut.onPull")
push(flowOut, inElem)
}
})
}
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val flow = Flow[Int].map(_ + 1)
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val select = b.add(new WrapFlow[Int, Int])
Source.single(1) ~> select.in
select.out ~> Sink.foreach[Int](r => println(s"result = $r"))
select.flowOut ~> flow ~> select.flowIn
ClosedShape
}).run(materializer)
StdIn.readLine
system.terminate
}
私が見ることが期待出力である:
1 out.onPull
2 in.onPush
3 flowOut.onPull
4 flowIn.onPush
result = 2
実際の出力がちょうど最初の3行である。
1 out.onPull
2 in.onPush
3 flowOut.onPull
InHandler.onPush() "flowIn" のためのものです決して呼び出されません。
このようにGraphStageを書くのは非凡なことですが、私はそれを必要としています。私を困惑何
は私がステップ2(プル(flowIn))、 と順番に付着流にそれを引っ張ることにより、付着流の需要を生成し、ステップ3
で「流出」の需要が発生したということですただし、手順3でflowOutを使用して要素をプッシュした後は、要素はプッシュされないため、手順4は実行されませんでした。
なぜですか?
添付のフローが下流の需要を感知し、ステップ3で上流に需要を生成する場合、ステップ3でプッシュされた要素が添付ストリームに通過しないのはなぜですか?
Btw。あなたのシェイプはすでにakkaで 'BidiShape'として定義されています。答えのために – jrudolph