次の形式の実行可能なグラフがあります。ClosedShapeストリームの完了を待ちます
def getGraph[T](sequence: Seq[T], tickConsumers: Map[T, ActorRef]) =
RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder =>
val ticker = builder.add(new SomeTickProducer))
val broadcast = builder.add(Broadcast[T](sequence.length))
ticker ~> broadcast
sequence.foreach { item =>
broadcast ~>
builder.add(new SomeTickProcesser(item)) ~>
Sink.actorRef(tickConsumers(item), NotUsed)
}
ClosedShape
}
)
私のグラフは閉じていてデータを生成しないので、私は実現できません。実際には、外部サービスから何らかのデータを受信し、処理し、複数の消費者にブロードキャストします。
限り、SomeTickProducer
は、外部サービスに対してかなりの負荷を生成します。これ以上のグラフを実行する必要はありません。グラフを何かに変換する方法はありますか?Future
と同じですか?Await
を使用して終了するのを待ちますか?または、ここにいくつかのキューを整理するための良い方法がありますか?
コール '.RUN()' – cchantep
うんを具現するために、。 '.run()'は 'Unit'を返し、アクターをブロックしません。 –