2017-12-01 15 views
0

次の形式の実行可能なグラフがあります。ClosedShapeストリームの完了を待ちます

def getGraph[T](sequence: Seq[T], tickConsumers: Map[T, ActorRef]) = 
    RunnableGraph.fromGraph(
    GraphDSL.create() { implicit builder => 
     val ticker = builder.add(new SomeTickProducer)) 
     val broadcast = builder.add(Broadcast[T](sequence.length)) 
     ticker ~> broadcast 
     sequence.foreach { item => 
     broadcast ~> 
     builder.add(new SomeTickProcesser(item)) ~> 
     Sink.actorRef(tickConsumers(item), NotUsed) 
     } 
     ClosedShape 
    } 
) 

私のグラフは閉じていてデータを生成しないので、私は実現できません。実際には、外部サービスから何らかのデータを受信し、処理し、複数の消費者にブロードキャストします。

限り、SomeTickProducerは、外部サービスに対してかなりの負荷を生成します。これ以上のグラフを実行する必要はありません。グラフを何かに変換する方法はありますか?Futureと同じですか?Awaitを使用して終了するのを待ちますか?または、ここにいくつかのキューを整理するための良い方法がありますか?

+0

コール '.RUN()' – cchantep

+0

うんを具現するために、。 '.run()'は 'Unit'を返し、アクターをブロックしません。 –

答えて

0

グラフを作成して未来を実現するには、メソッドにSink.ignoreのような方法を与える必要があります。その後、あなたはGraphDSL建築家の中からそのステージにアクセスします。 SomeTickProducerと仮定すると

あなたはSomeTickProducerの終了を監視し、グラフのコンセントにそれを転送することができ、最終的に完了しますSourceです。

次に、グラフをマテリアライズすると、タイプはFuture[Done]になります。

これは次のように動作します:私はしようとした最初のものだった

def getGraph[T](sequence: Seq[T], tickConsumers: Map[T, ActorRef]) = 
    RunnableGraph.fromGraph(
    GraphDSL.create(Sink.ignore) { implicit builder => out => 
     val producer = new SomeTickProducer 
     val ticker = builder.add(producer) 
     val broadcast = builder.add(Broadcast[T](sequence.length)) 
     ticker ~> broadcast 
     sequence.foreach { item => 
     broadcast ~> 
     builder.add(new SomeTickProcesser(item)) ~> 
     Sink.actorRef(tickConsumers(item), NotUsed) 
     } 
     producer.watchTermination()(Keep.none) ~> out 
     ClosedShape 
    } 
) 
関連する問題