別のアクタからメッセージを送信することで、一時停止/一時停止が可能なGraphStageを作成したいと思います。Akka Stream - Pausable GraphStage(Akka 2.5.7)
次のコードは、乱数を生成する簡単なGraphStage
を示しています。ステージが具体化されると、GraphStageLogic
はStageActor
を含むメッセージ(preStart()
内)を監督者に送信します。監督者はステージのActorRef
を保持しているため、ステージの制御に使用できます。
object RandomNumberSource {
case object Pause
case object UnPause
}
class RandomNumberSource(supervisor: ActorRef) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("rnd.out")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new RandomNumberSourceLogic(shape)
}
private class RandomNumberSourceLogic(shape: Shape) extends GraphStageLogic(shape) with StageLogging {
lazy val self: StageActor = getStageActor(onMessage)
val numberGenerator: Random = Random
var isPaused: Boolean = true
override def preStart(): Unit = {
supervisor ! AssignStageActor(self.ref)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (!isPaused) {
push(out, numberGenerator.nextInt())
Thread.sleep(1000)
}
}
})
private def onMessage(x: (ActorRef, Any)): Unit =
{
x._2 match {
case Pause =>
isPaused = true
log.info("Stream paused")
case UnPause =>
isPaused = false
getHandler(out).onPull()
log.info("Stream unpaused!")
case _ =>
}
}
}
}
これは、スーパーバイザー俳優の非常に単純な実装です:アプリケーションがステージを開始すると
object Application extends App {
implicit val system = ActorSystem("my-actor-system")
implicit val materializer = ActorMaterializer()
val supervisor = system.actorOf(Props[Supervisor], "supervisor")
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new RandomNumberSource(supervisor)
val randomNumberSource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
randomNumberSource.take(100).runForeach(println)
println("Start stream by pressing any key")
StdIn.readLine()
supervisor ! UnPause
StdIn.readLine()
supervisor ! Pause
StdIn.readLine()
println("=== Terminating ===")
system.terminate()
}
:私はストリームを実行するには、次のアプリケーションを使用してい
object Supervisor {
case class AssignStageActor(ref: ActorRef)
}
class Supervisor extends Actor with ActorLogging {
var stageActor: Option[ActorRef] = None
override def receive: Receive = {
case AssignStageActor(ref) =>
log.info("Stage assigned!")
stageActor = Some(ref)
ref ! Done
case Pause =>
log.info("Pause stream!")
stageActor match {
case Some(ref) => ref ! Pause
case _ =>
}
case UnPause =>
log.info("UnPause stream!")
stageActor match {
case Some(ref) => ref ! UnPause
case _ =>
}
}
}
iaは「一時停止」状態にあり、番号は生成されません。私がキーを押すと、私のステージが数字を放射するようになります。しかし、私の問題は、起動後にステージに送られたすべてのメッセージが無視されることです。私はステージを一時停止することはできません。
私は、アクターから受け取ったメッセージに基づいてステージの動作を変更することに興味がありますが、見つかったすべての例ではアクターのメッセージがストリームに渡されます。
誰かが私のコードがうまくいかない理由を推測しているのですか、そのような方法を構築する方法がありますかGraphStage
?
ありがとうございました!
あなたの答えをありがとう。あなたの提案に基づいて、私は 'AsyncCallback'を使って同じ[RandomNumberSource](https://gist.github.com/kKdH/b03064f1f500f64e0e2058fb82bf0750#file-randomnumbersource-scala)を実装しました。バルブのように使用します。しかし、それと同じ問題です。ストリームが実行されるとすぐに、ステージはコールバックの呼び出しに反応しません。 'push()'が最初に呼び出されるまで問題を絞り込むことができました。あなたは何か考えていますか? – kKdH