2017-10-30 17 views
0

私はAkka、より具体的にはAkka Persistenceを初めて実験しています。私は最終的にイベントソーシングアプリケーションでのAkkaの使用を複製する小さなおもちゃプログラムを実装しようとしています。私はReadJournalを使って私のイベントストリームを自分のドメインに投影しようとするまで、成功しました。Akka Persistence:ReadJournal.runFold never returns

def main(args: Array[String]): Unit = { 
    val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate()) 

    implicit val executionContext = ExecutionContext.global 
    implicit val system = ActorSystem.create("employee-service-actor-system") 
    implicit val mat: Materializer = ActorMaterializer()(system) 

    val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId)) 

    commands.stream.foreach(command => service.tell(command, noSender)) 

    lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal") 
     .asInstanceOf[ReadJournal 
     with CurrentPersistenceIdsQuery 
     with CurrentEventsByPersistenceIdQuery 
     with CurrentEventsByTagQuery 
     with EventsByPersistenceIdQuery 
     with EventsByTagQuery] 

    println(Await.result(
     readJournal 
     .eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue) 
     .map(_.event) 
     .runFold(Employee.apply())({ 
      case (employee: Employee, event: EmployeeEvent) => employee.apply(event) 
     }), 
     Duration("10s") 
    ))  
} 

マイドメインの唯一の集計はEmployeeあるので、私はいくつかの従業員を表すUUIDを持つ俳優を始めている、そして私は、その従業員のためにいくつかのコマンドを発行しています。

上記の例では、println(Await.result(...))を削除し、.runFold(...).runForeach(println)に置き換えた場合、与えられたコマンドごとに、自分の俳優で永続化されたイベントが期待どおりに出力されます。だから私は私のプログラムとReadJournalの書き込み側が両方とも期待どおりに動作していることを知っています。 、私のプログラムは、だから今

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds] 

で終了すると、

私の質問は、なぜ私は最終的に私のイベントストリームを再生するrunFold実行することはできませんか?これを行うより良い方法はありますか?私は単にAPIを悪用していますか?

ありがとうございます、ありがとうございます!

答えて

1

runFoldを使用すると、ストリームを折りたたんでいます。フォールドは、ストリーム自体が終了すると効果的に終了します。

eventsByPersistenceIdを使用すると、ライブイベントの終わりのないストリームを要求しているため、あなたのフォールドは終了しません。

代わりにcurrentEventsByPersistenceIdを使用してください。このバリアントは、ジャーナルで現在利用可能なイベントをストリーミングして終了します。

https://doc.akka.io/docs/akka/2.5.6/scala/persistence-query.html#eventsbypersistenceidquery-and-currenteventsbypersistenceidqueryを参照してください

+0

ああ、あなたの助けに感謝 - 私はドキュメントで見つけることができませんでした1つの物事:ストリームが終了することができる方法は何ですか?私は 'system.stop(service)'を試み、 'registerOnTerminate(...) 'というコールバックを付けてプログラムを終了しました.Akkaがストリームの終了を提供する簡単な方法はありますか?ターミナル「シンク」を提供する鍵は、ストリームが最終的にどのように終了すべきかという考えがあるかどうかです。 – lyonssp

+0

'KillSwitch'esについては、https://doc.akka.io/docs/akka/current/scala/stream/stream-dynamic.html#uniquekillswitchとhttps://doc.akka.io/docs/akka/currentを参照してください。 /scala/stream/stream-dynamic.html#sharedkillswitch –

+0

あなたは伝説、感謝の芽です。私は次回にドキュメントを深く掘り下げることを約束します:p – lyonssp