2016-06-27 9 views
1

私は二ティックは、この第二の中に受信したすべてのメッセージをフラッシュするために、自己に送信されます。いくつかのマイナーな変更信頼性の高い定期アクタの実装方法

trait PeriodicActor[T] extends DecoratingActor with ActorLogging { 

    import context.dispatcher 
    var messages = new ListBuffer[T] 

    abstract override def preStart() = { 
    schedule() 
    super.preStart() 
    } 

    protected def schedule() { 
    context.system.scheduler.scheduleOnce(1 seconds, self, Tick) 
    } 

    receiver { 
    case Tick => { 
     flush() 
     schedule() 
    } 
    } 

    def flush() = { 
    handleMessages(this.messages) 
     .recover { 
     case NonFatal(e) => log.error(e, "error in actor") 
     } 
    this.messages.clear() 
    } 

    /** 
    * implement to handle buffered messages. 
    */ 
    def handleMessages(messages: ListBuffer[T]): Future[Any] 

    def buffer(msg: T) = { 
    messages.append(msg) 
    } 
} 

毎週1とアッカ・パターンから取らPeriodicActorを使用しています

私の問題は、scheduleOnceが十分に信頼できないということです。 Tickメッセージが受信されなかった可能性があります。つまり、Tickingメカニズムが機能しなくなります。

は、だから私は、これは停止しないことを確認する方法を考えた:バッファ法上の文は、それは例えばことを確認するかどう

  1. はたぶん追加リストは、特定のサイズに取得する場合、それをそれを洗い流すためにTickを送るでしょう。ここで問題となるのは、ティック・メッセージが1つ失われていると、しばらくの間、リストがフラッシュされない可能性があるということです。

  2. 5分ごとにTickメッセージの別のスケジューリングを追加して、メカニズムが正常に機能することを確認することをお勧めします。

    context.system.scheduler.schedule(5分、longInterval、自己、ティック)

あなたはどう思いますか?より良い方法がありますか?

+0

配送が保証されていないため、メッセージが届かない可能性があることに同意します。メッセージが送信されない可能性があります私はスケジューラの信頼性に関する文書を見つけることができません。 – mattinbits

+0

@mattinbitsあなたは正しいです、私の質問を更新しました。ありがとう。 – Tomer

答えて

1

あなたの問題は、Akkaが保証するat-most-once配信セマンティクスに起因します。 5分ごとに追加のTicksを送信することを検討しているので、Ticksを1回配信する必要はないと仮定しましょう。これは、少なくとも1度ティックを納品する必要があることを意味します。

少なくとも何らかのACK-RETRYを使用して達成されます。この場合、askパターンを使用してこれを達成できます。アッカ配信保証の

receiver { 
    case Tick => { 
     sender ! TickAck 
     flush() 
     schedule() 
    } 
    } 

詳細情報::あなたの受信機では

protected def schedule() { 
    context.system.scheduler.scheduleOnce(1 seconds){ 
    sendTick() 
    } 
} 

def sendTick() { 
    implicit val timeout = 1 second 
    (self ? Tick).onComplete{ 
    case Success(_) => schedule() // <- queue up next tick 
    case Failure(_) => sendTick() // <- send another tick immediately 
    } 
} 

は、あなたは確認する必要はあり http://www.mjlivesey.co.uk/2016/02/19/akka-delivery-guarantees.html

としてだけでなく、失われたメッセージの失敗の他の原因を俳優が死にかけています。これには監視戦略が必要です。デフォルトの戦略はアクターを再起動することで、postRestart()メソッドはデフォルトでpreStart()を呼び出します。別の監督方針を使用するか、またはpostRestart()を上書きしない限り、受け入れられる動作を得るはずです。俳優が死亡した場合は、それを再開し、別のチックをスケジュールします。

関連する問題