2016-09-27 3 views
0

私はakkaスケジューラの使用法のいくつかの例を探しています。私は、あるデータベースからデータを取得するための1つのアクター(dataProducerと呼ぶ)を実装しています。私は、5秒間隔でdataProducerアクターをポールするスケジューラーアクターを1つ作成したいと思います。また、データ検索にスケジューラ間隔よりも時間がかかる場合は、ケースをどのように処理するか。スケジューラのアクタのscheduleOnceメソッドがこれを処理しますか?ここでakkaスケジューラを使用したデータプッシュ

import java.util.concurrent.{Executors, TimeUnit} 
import akka.actor.{Actor, Cancellable, Props} 
import scala.concurrent.ExecutionContext 

class SchedulerActor(interval: Long) extends Actor with LogF{ 

    implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(100)) 

    private var scheduler: Cancellable = _ 

    override def preStart(): Unit = { 
    import scala.concurrent.duration._ 
    scheduler = context.system.scheduler.schedule(
     initialDelay = 0 seconds, 
     interval = FiniteDuration(interval, TimeUnit.SECONDS), 
     receiver = self, 
     message = FetchData 
    ) 
    } 

    override def postStop(): Unit = { 
    scheduler.cancel() 
    } 

    def receive = { 
    case FetchData => 
     logger.debug("Fetch Data") 
     sender() ! "Data Fetched!!!" //here I'll call dataProducer API 
     true 
    case unknown => 
     throw new RuntimeException("ERROR: Received unknown message [" + unknown + "], can't handle it") 
    } 

} 

object SchedulerActor { 

    def props(interval: Long): Props = Props(new SchedulerActor(interval)) 
} 

sealed trait FetchDataMessage 
case object FetchData extends FetchDataMessage 
+0

意味のある回答を得たい場合は、これまでに試したことを示すことをおすすめします。 – hasumedic

+0

私の質問が更新されました... – Abhay

答えて

0

スケジューラのscheduleOnceは、いくつかの遅延の後の部分を実行するのに役立ちます私のスケジューラの俳優です。 さまざまな状態を持ち、異なる種類のメッセージを受け入れ、それに応じて動作するように状態を切り替えます。しかし、タイムアウトが発生すると、scheduleOnceはtimeoutStateに移動します。

ScheduleOnceは、タイムアウトが発生したことを俳優に知らせるのに役立ちます。

データ取得にスケジューラ間隔よりも時間がかかる場合はどうすれば対応できますか?

データフェッチが指定された時間を超えると、アクターの状態の変更がtimeoutStateになり、タイムアウト状態でアクターに何をすべきかが示されます。再試行するか、別のソースを試してみてください。私は5秒dataProducerを要求するために遅らせ、全部が再び繰り返されるとscheduleOnceのための結果状態待ちになり5秒でポールdataProducer俳優の間隔

1人のスケジューラ俳優を書きたいと思い

このコードをチェックして、どのように実行できるかを理解してください。

import akka.actor.{Actor, Cancellable} 
import stackoverflow.DBUtils.Entity 

import scala.concurrent.Future 
import scala.concurrent.duration._ 
import akka.pattern.pipe 


object DBPollActor { 
    case class Result(results: List[Entity]) 
    case object Schedule 
    case object Timeup 
    case object FetchData 
} 

object DBUtils { 
    case class Entity(name: String) 

    def doDBOperation: Future[List[Entity]] = { 
    Future.successful(List(Entity(name = "foo"))) 
    } 

} 

class DBPollActor(timeout: Int) extends Actor { 

    import DBPollActor._ 

    implicit val ex = context.system.dispatcher 

    var schedulerOpt: Option[Cancellable] = None 

    @scala.throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
    super.preStart() 
    self ! FetchData 
    schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
    }) 
    } 

    override def receive: Receive = { 
    case [email protected] => 
     context become startState 
     self forward msg 
    } 

    def startState: Receive = { 
    case FetchData => 
     schedulerOpt.map(_.cancel()) 
     context become resultState 
     DBUtils.doDBOperation.map(Result) pipeTo self 
     schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
     }) 
    } 

    def timeoutState: Receive = { 
    case Timeup => 
     schedulerOpt.map(_.cancel()) 
     //Timeout happened do something or repeat 
    } 

    def resultState: Receive = { 
    case [email protected](list) => 
     schedulerOpt.map(_.cancel()) 
     //Result available consume the result and repeat or doSomething different 
    context become resultState 
     DBUtils.doDBOperation.map(Result) pipeTo self 
     schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
     }) 

    case ex: Exception => 
     schedulerOpt.map(_.cancel()) 
     //future failed exit or retry 
    } 
} 
関連する問題