0
私は愚かな質問を持っていますが、原因を知っていることができませんでした:代替(の削減)
import akka.{Done, NotUsed}
import akka.actor.Status.Success
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.Future
object Generic {
def main(args: Array[String]) {
implicit val system = ActorSystem("system")
implicit val mat = ActorMaterializer()
val sink: Sink[Any, Future[Done]] = Sink.foreach(x => println("Ans =====> " + x))
val counts = Flow[String]
.mapConcat(x => x.split("\\s").toList)
.filter(!_.isEmpty)
.groupBy(Int.MaxValue, identity)
.map(x => x -> 1)
.reduce((l, r) => (l._1, l._2 + r._2))
.mergeSubstreams
val fold: Flow[String, Int, NotUsed] = Flow[String].map(x => 1).fold(0)(_ + _)
val words: RunnableGraph[ActorRef] = Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
.via(counts)
.to(sink)
val ref = words.run()
for {
ln <- scala.io.Source.stdin.getLines.takeWhile(_ != "-1")
} {
println("---> Message sent " + ln)
ref ! ln
}
ref ! Success("end")
Thread.sleep(5000)
system.terminate()
}
}
それは非常にシンプルなものを行います。申請端末で、I入力文を。単語を抽出し、各単語の頻度を維持します。そして期待どおりに動作します。問題は次のとおりです。
- ソースは無限のストリームです。つまり、ソースを終了したときにのみ、出力を出力します。終了する代わりに常にライブ統計を出力するようにプログラムをリファクタリングすることはできますか?この動作は、
reduce
reduce
の中にprintステートメントを持つことによって行われると考えられます。しかし、別のシンクにライブセンテンスを別のシンクに送る(放送経由で送る)ようなことができますか?