2017-07-19 7 views
0

私は愚かな質問を持っていますが、原因を知っていることができませんでした:代替(の削減)

import akka.{Done, NotUsed} 
import akka.actor.Status.Success 
import akka.actor.{ActorRef, ActorSystem} 
import akka.stream.scaladsl.{Flow, RunnableGraph, Sink, Source} 
import akka.stream.{ActorMaterializer, OverflowStrategy} 

import scala.concurrent.Future 


object Generic { 
    def main(args: Array[String]) { 

    implicit val system = ActorSystem("system") 
    implicit val mat = ActorMaterializer() 

    val sink: Sink[Any, Future[Done]] = Sink.foreach(x => println("Ans =====> " + x)) 

    val counts = Flow[String] 
     .mapConcat(x => x.split("\\s").toList) 
     .filter(!_.isEmpty) 
     .groupBy(Int.MaxValue, identity) 
     .map(x => x -> 1) 
     .reduce((l, r) => (l._1, l._2 + r._2)) 
     .mergeSubstreams 

    val fold: Flow[String, Int, NotUsed] = Flow[String].map(x => 1).fold(0)(_ + _) 

    val words: RunnableGraph[ActorRef] = Source.actorRef(Int.MaxValue, OverflowStrategy.fail) 
     .via(counts) 
     .to(sink) 

    val ref = words.run() 

    for { 
     ln <- scala.io.Source.stdin.getLines.takeWhile(_ != "-1") 
    } { 
     println("---> Message sent " + ln) 
     ref ! ln 
    } 
    ref ! Success("end") 
    Thread.sleep(5000) 
    system.terminate() 
    } 
} 

それは非常にシンプルなものを行います。申請端末で、I入力文を。単語を抽出し、各単語の頻度を維持します。そして期待どおりに動作します。問題は次のとおりです。

  • ソースは無限のストリームです。つまり、ソースを終了したときにのみ、出力を出力します。終了する代わりに常にライブ統計を出力するようにプログラムをリファクタリングすることはできますか?この動作は、reduce

reduceの中にprintステートメントを持つことによって行われると考えられます。しかし、別のシンクにライブセンテンスを別のシンクに送る(放送経由で送る)ようなことができますか?

答えて

1

コンビネータscanを見てください。それはあなたにfold/reduceの集約力を与えるが、それは中間結果を放出する。また

// .reduce((l, r) => (l._1, l._2 + r._2)) 
     .scan("" → 0)((l, r) => (l._1, l._2 + r._2)) 

ロギングSinkに出力を送信する場合、あなたは効果的な選択の側Sinkにブロードキャストを行うであろう、alsoToに見ることができます。