私はAkka、より具体的にはAkka Persistenceを初めて実験しています。私は最終的にイベントソーシングアプリケーションでのAkkaの使用を複製する小さなおもちゃプログラムを実装しようとしています。私はReadJournal
を使って私のイベントストリームを自分のドメインに投影しようとするまで、成功しました。Akka Persistence:ReadJournal.runFold never returns
def main(args: Array[String]): Unit = {
val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())
implicit val executionContext = ExecutionContext.global
implicit val system = ActorSystem.create("employee-service-actor-system")
implicit val mat: Materializer = ActorMaterializer()(system)
val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))
commands.stream.foreach(command => service.tell(command, noSender))
lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
.asInstanceOf[ReadJournal
with CurrentPersistenceIdsQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with EventsByPersistenceIdQuery
with EventsByTagQuery]
println(Await.result(
readJournal
.eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
.map(_.event)
.runFold(Employee.apply())({
case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
}),
Duration("10s")
))
}
マイドメインの唯一の集計はEmployee
あるので、私はいくつかの従業員を表すUUIDを持つ俳優を始めている、そして私は、その従業員のためにいくつかのコマンドを発行しています。
上記の例では、println(Await.result(...))
を削除し、.runFold(...)
を.runForeach(println)
に置き換えた場合、与えられたコマンドごとに、自分の俳優で永続化されたイベントが期待どおりに出力されます。だから私は私のプログラムとReadJournal
の書き込み側が両方とも期待どおりに動作していることを知っています。 、私のプログラムは、だから今
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
で終了すると、
私の質問は、なぜ私は最終的に私のイベントストリームを再生するrunFold
実行することはできませんか?これを行うより良い方法はありますか?私は単にAPIを悪用していますか?
ありがとうございます、ありがとうございます!
ああ、あなたの助けに感謝 - 私はドキュメントで見つけることができませんでした1つの物事:ストリームが終了することができる方法は何ですか?私は 'system.stop(service)'を試み、 'registerOnTerminate(...) 'というコールバックを付けてプログラムを終了しました.Akkaがストリームの終了を提供する簡単な方法はありますか?ターミナル「シンク」を提供する鍵は、ストリームが最終的にどのように終了すべきかという考えがあるかどうかです。 – lyonssp
'KillSwitch'esについては、https://doc.akka.io/docs/akka/current/scala/stream/stream-dynamic.html#uniquekillswitchとhttps://doc.akka.io/docs/akka/currentを参照してください。 /scala/stream/stream-dynamic.html#sharedkillswitch –
あなたは伝説、感謝の芽です。私は次回にドキュメントを深く掘り下げることを約束します:p – lyonssp