2017-05-07 4 views
1

私は、一連のファイルを受け取り、実行終了時にそれらを1つずつ処理するグラフを持っています。すべての実行が成功または失敗した場合、プログラムは成功(0)または失敗(-1)を返します。
この最後のステップはどのように達成できますか?最後のファイルの結果を受け取ったとき、シンクはどのようにして知ることができましたか?すべての要素が処理されたときにシンクに信号を送る方法は?

val graph = createGraph("path-to-list-of-files") 
val result = graph.run() 

def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = { 
    printStage("PREPARING") { 
    val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource() 
    val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow() 
    val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow() 
    val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow() 
    val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow() 
    val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink() 

    val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 
    producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter 

    ClosedShape 
    }) 
    printLine("The graph pipeline was created") 
    graphResult 
} 

答えて

1

あなたreporterシンクはすでに、すべてのあなたの要素が処理されているいくつかのコードを実行したい場合は、にフックすることができますFuture[Done]に実体化。

しかし、現時点ではグラフに公開していません。グラフのDSLを使用して、それを公開する方法がありますが、あなたのケースで、達成するために流暢なDSLを使用することも簡単です:あなたは、グラフを実行したときに

val graphResult: RunnableGraph[Future[Done]] = producer 
    .via(validator) 
    .via(provisioner) 
    .via(executor) 
    .via(evaluator) 
    .toMat(reporter)(Keep.right) 

これはバックFuture[Done]あなたを与えるだろう

val result: Future[Done] = graph.run() 

これで、あなたはフックすることができます。

result.onComplete { 
    case Success(_) => println("Success!") 
    case Failure(_) => println("Failure..") 
} 
関連する問題