1
私は、一連のファイルを受け取り、実行終了時にそれらを1つずつ処理するグラフを持っています。すべての実行が成功または失敗した場合、プログラムは成功(0)または失敗(-1)を返します。
この最後のステップはどのように達成できますか?最後のファイルの結果を受け取ったとき、シンクはどのようにして知ることができましたか?すべての要素が処理されたときにシンクに信号を送る方法は?
val graph = createGraph("path-to-list-of-files")
val result = graph.run()
def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = {
printStage("PREPARING") {
val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource()
val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow()
val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow()
val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow()
val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow()
val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink()
val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter
ClosedShape
})
printLine("The graph pipeline was created")
graphResult
}