2017-12-03 13 views
1

別のアクタからメッセージを送信することで、一時停止/一時停止が可能なGraphStageを作成したいと思います。Akka Stream - Pausable GraphStage(Akka 2.5.7)

次のコードは、乱数を生成する簡単なGraphStageを示しています。ステージが具体化されると、GraphStageLogicStageActorを含むメッセージ(preStart()内)を監督者に送信します。監督者はステージのActorRefを保持しているため、ステージの制御に使用できます。

object RandomNumberSource { 
    case object Pause 
    case object UnPause 
} 

class RandomNumberSource(supervisor: ActorRef) extends GraphStage[SourceShape[Int]] { 

    val out: Outlet[Int] = Outlet("rnd.out") 

    override val shape: SourceShape[Int] = SourceShape(out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
    new RandomNumberSourceLogic(shape) 
    } 

    private class RandomNumberSourceLogic(shape: Shape) extends GraphStageLogic(shape) with StageLogging { 

    lazy val self: StageActor = getStageActor(onMessage) 

    val numberGenerator: Random = Random 
    var isPaused: Boolean = true 

     override def preStart(): Unit = { 
     supervisor ! AssignStageActor(self.ref) 
     } 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      if (!isPaused) { 
      push(out, numberGenerator.nextInt()) 
      Thread.sleep(1000) 
      } 
     } 
     }) 

     private def onMessage(x: (ActorRef, Any)): Unit = 
     { 
     x._2 match { 
      case Pause => 
      isPaused = true 
      log.info("Stream paused") 
      case UnPause => 
      isPaused = false 
      getHandler(out).onPull() 
      log.info("Stream unpaused!") 
      case _ => 
     } 
     } 
    } 
} 

これは、スーパーバイザー俳優の非常に単純な実装です:アプリケーションがステージを開始すると

object Application extends App { 

    implicit val system = ActorSystem("my-actor-system") 
    implicit val materializer = ActorMaterializer() 

    val supervisor = system.actorOf(Props[Supervisor], "supervisor") 

    val sourceGraph: Graph[SourceShape[Int], NotUsed] = new RandomNumberSource(supervisor) 
    val randomNumberSource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph) 

    randomNumberSource.take(100).runForeach(println) 

    println("Start stream by pressing any key") 

    StdIn.readLine() 

    supervisor ! UnPause 

    StdIn.readLine() 

    supervisor ! Pause 

    StdIn.readLine() 

    println("=== Terminating ===") 
    system.terminate() 
} 

:私はストリームを実行するには、次のアプリケーションを使用してい

object Supervisor { 
    case class AssignStageActor(ref: ActorRef) 
} 

class Supervisor extends Actor with ActorLogging { 

    var stageActor: Option[ActorRef] = None 

    override def receive: Receive = { 

    case AssignStageActor(ref) => 
     log.info("Stage assigned!") 
     stageActor = Some(ref) 
     ref ! Done 

    case Pause => 
     log.info("Pause stream!") 
     stageActor match { 
     case Some(ref) => ref ! Pause 
     case _ => 
     } 

    case UnPause => 
     log.info("UnPause stream!") 
     stageActor match { 
     case Some(ref) => ref ! UnPause 
     case _ => 
     } 
    } 
} 

iaは「一時停止」状態にあり、番号は生成されません。私がキーを押すと、私のステージが数字を放射するようになります。しかし、私の問題は、起動後にステージに送られたすべてのメッセージが無視されることです。私はステージを一時停止することはできません。

私は、アクターから受け取ったメッセージに基づいてステージの動作を変更することに興味がありますが、見つかったすべての例ではアクターのメッセージがストリームに渡されます。

誰かが私のコードがうまくいかない理由を推測しているのですか、そのような方法を構築する方法がありますかGraphStage

ありがとうございました!

答えて

1

Akka Stream Contribプロジェクトには、フローを一時停止および再開できる値を実現するValveステージがあります。このクラスのScaladocから:

ステージを通過する要素の流れを停止または再起動方法フリップを提供ValveSwitchの将来に具体化。バルブが閉じている限り、背圧がかかります。例えば

val (switchFut, seqSink) = Source(1 to 10) 
    .viaMat(new Valve(SwitchMode.Close))(Keep.right) 
    .toMat(Sink.seq)(Keep.both) 
    .run() 

switchFutFuture[ValveSwitch]であり、スイッチが最初に閉じられているので、バルブの背圧と何も下流側に放出されます。バルブを開くには:には、バルブを開くには:

switchFut.onComplete { 
    case Success(switch) => 
    switch.flip(SwitchMode.Open) // Future[Boolean] 
    case _ => 
    log.error("the valve failed") 
} 

もっと例があります。

+0

あなたの答えをありがとう。あなたの提案に基づいて、私は 'AsyncCallback'を使って同じ[RandomNumberSource](https://gist.github.com/kKdH/b03064f1f500f64e0e2058fb82bf0750#file-randomnumbersource-scala)を実装しました。バルブのように使用します。しかし、それと同じ問題です。ストリームが実行されるとすぐに、ステージはコールバックの呼び出しに反応しません。 'push()'が最初に呼び出されるまで問題を絞り込むことができました。あなたは何か考えていますか? – kKdH

関連する問題