2017-11-06 7 views
1

クラスタでシャーディングを使用してakkaのPersistentActorを使用して自分の状態を追跡します。私は、私は次のコード経由で更新することができます「ルーム」を持っている:akka persist関数は毎回処理されません

case UpdateRoom(id, room, userId) => ((ret: ActorRef) => { 
    (userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map { role => 
    if (role == "owner") { 
     state = Some(room) 
     ret ! Success(room) 
     eventRegion ! RoomEventPackage(id, RoomUpdated(room)) 
     println("works") 
     persist(RoomUpdated(room))(e => println("this doesn't always fire") 
    } else { 
     ret ! Failure(InsufficientRights(role, "Update Room")) 
    } 
    } 

問題は、予想通りの機能の残りの部分が動作しながら、他のすべての時間を働くだけ持続ものです。 (「作品」はたびに印刷され、「これは必ずしも発射されない」、お互いに2回)。 私はイベントを保存するためにupdateコマンドを2回起動する必要がありますが、それは両方のコマンドが実行された時点で保存されているようです。

akka persistの重要な部分が欠落していますか?

答えて

1

Actorの世界で重大な間違いをしていると思います。外から俳優(変更可能な)状態にアクセスしています。あなたのケースでは、これはask/?によって返さFutureのコールバック内から2回行われます

  • 更新状態:state = Some(room)
  • persist

に対処する唯一の安全な方法を呼び出しますあなたのActorの中から尋ね、続いて俳優の状態を変更すると、尋問のコールバックから同じ俳優にメッセージを送り、その目的のためにpipeToを使うことができます。

case UpdateRoom(id, room, userId) => 
    val answer = (userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map(role => RoleForRoom(id, room, userId, role)) 
    answer piepTo self 
case RoleForRoom(id, room, userId, room) => 
    if (role == "owner") { 
    state = Some(room) 
    eventRegion ! RoomEventPackage(id, RoomUpdated(room)) 
    persist(RoomUpdated(room))(e => println("this is safe")) 
    } 

も参照してください:https://doc.akka.io/docs/akka/2.5.6/scala/general/jmm.html#actors-and-shared-mutable-state

説明するためにあなたのコードの簡易版を使用して

関連する問題