2017-04-02 7 views
2

私はAkka Persistence Queryを理解する際に問題があります。特に、eventsByTagというメソッドは、期待通りに動作しないためです。なぜAkka Persisence Query Read Journalは自分のイベントを取得しないのですか?

私のメインクラスでは、特定のタグで保持されているすべてのイベントのリッスンを開始するクラスを呼び出します。

class CassandraJournal(implicit val system: ActorSystem) { 

def engageStreaming = { 
    val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    implicit val mat = ActorMaterializer() 

    readJournal.eventsByTag("account", Offset.noOffset) 
    .runForeach { event => println(event) } 
    } 
} 

が、私は自分のサーバーを起動するたびに、私のイベント・ストアが空であると私は(アッカHTTPに組み込まれたHTTPサービスを呼び出すことで)私の最初のイベントを持続、イベントが実際に印刷されます。ただし、サーバーを再始動してイベント・ストアにイベントがすでに存在する場合、新しい永続イベントは印刷されません。

説明はありますか?なぜこれが起こっているのか分かりません。私が使用している

EDIT

イベント・ストアは、カサンドラです。ここでreceiveRecoverが必要な状態復旧作業を行っていないと

class Account(id: UUID) extends PersistentActor { 

    override def receiveRecover: Receive = { 
    case createCheckingsAccount: CreateCheckingsAccount => 
     println("Creating checkings account") 
    } 

    override def receiveCommand: Receive = { 
    case createCheckingsAccount: CreateCheckingsAccount => 
     persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event => 
     val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated] 
     sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString) 
     } 

    } 

    def updateState(evt: Event): Unit = { 
    } 

    override def persistenceId: String = s"account-$id" 
} 
+0

イベントアダプタ、永続アクタ、および永続性のために使用するデータストアに関する質問を更新します。 –

答えて

2

(私はタグ付き()の周りにそれらをラップし、イベントにタグ付けするには、イベント・アダプターを使用していない)PersistentActorで、持続性が正しく動作しないでしょう。私はいくつかの基本的な状態回復ロジックをreceiveRecoverに入れ、あなたのupdateStateメソッドもタグ付きイベントケースをカバーすることをお勧めします。

私はeventsByTagを次のような状態リカバリロジックを持つアプリケーションで使用しました。これは、新鮮な開始と回復の両方でうまく機能しました。

+0

私は本当に適切な復旧作業を設定することによってそれを働かせることができました。私は回復が新鮮なスタートで実行されるという事実を知らなかった。ご協力いただきありがとうございます! –

関連する問題