私はAkka Persistence Queryを理解する際に問題があります。特に、eventsByTagというメソッドは、期待通りに動作しないためです。なぜAkka Persisence Query Read Journalは自分のイベントを取得しないのですか?
私のメインクラスでは、特定のタグで保持されているすべてのイベントのリッスンを開始するクラスを呼び出します。
class CassandraJournal(implicit val system: ActorSystem) {
def engageStreaming = {
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
implicit val mat = ActorMaterializer()
readJournal.eventsByTag("account", Offset.noOffset)
.runForeach { event => println(event) }
}
}
が、私は自分のサーバーを起動するたびに、私のイベント・ストアが空であると私は(アッカHTTPに組み込まれたHTTPサービスを呼び出すことで)私の最初のイベントを持続、イベントが実際に印刷されます。ただし、サーバーを再始動してイベント・ストアにイベントがすでに存在する場合、新しい永続イベントは印刷されません。
説明はありますか?なぜこれが起こっているのか分かりません。私が使用している
EDIT
イベント・ストアは、カサンドラです。ここでreceiveRecover
が必要な状態復旧作業を行っていないと
class Account(id: UUID) extends PersistentActor {
override def receiveRecover: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
println("Creating checkings account")
}
override def receiveCommand: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
}
}
def updateState(evt: Event): Unit = {
}
override def persistenceId: String = s"account-$id"
}
イベントアダプタ、永続アクタ、および永続性のために使用するデータストアに関する質問を更新します。 –