2017-10-24 12 views
0

Iveはこのような数千の行を含むファイルを取得しました。Akka Streams:ファイルソースの調整

Mr|David|Smith|[email protected] 
Mrs|Teri|Smith|[email protected] 
... 

私は、各行をダウンストリームに放出するが、抑制された方法でファイルを読みたい。 1 /秒。

私は、流れの中で調整がどのように働くかを理解できません。

flow1(下記)は、1秒後に最初の行を出力して終了します。
flow2(以下)は、1秒間待機してからファイル全体を出力します。

val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file) 

val flow1 = Flow[ByteString]. 
       via(Framing.delimiter(ByteString(System.lineSeparator),10000)). 
       throttle(1, 1.second, 1, ThrottleMode.shaping). 
       map(bs => bs.utf8String) 

val flow2 = Flow[ByteString]. 
       throttle(1, 1.second, 1, ThrottleMode.shaping). 
       via(Framing.delimiter(ByteString(System.lineSeparator), 10000)). 
       map(bs => bs.utf8String) 

val sink = Sink.foreach(println) 
val res = source.via(flow2).to(sink).run().onComplete(_ => system.terminate()) 

私はdocsを勉強しても解決できませんでした。

どのポインタにも大いに感謝します。 flow1

答えて

1

使用runWith、代わりのto、:

val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file) 

val flow1 = 
    Flow[ByteString] 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000)) 
    .throttle(1, 1.second, 1, ThrottleMode.shaping) 
    .map(bs => bs.utf8String) 

val sink = Sink.foreach(println) 

source.via(flow1).runWith(sink).onComplete(_ => system.terminate()) 
+0

完璧、おかげでジェフ! – Nio

+0

だから私の流れは意図したとおりに動作していたと思いますが、シンクを使って間違ってデバッグしていました。 – Nio

関連する問題