2017-09-02 12 views
0

私はあなたのステージの1つで、InputStreamを返す呼び出しを行う必要がある状況を処理する方法を理解しようとしています。ここで、そのストリームをステージのソースとして扱いますそれはさらに下に来る。ストリーム内のAkkaストリームストリーム

Source.map(e => Calls that return an InputStream) 
.via(processingFlow).runwith(sink.ignore) 

私は要素は入力ストリームから来たものとして処理フローに行くことにしたいと思います。これは基本的に私がファイルをテーリングし、各行を読んでいる行です。私はCLI APIに対して呼び出しに必要な呼び出しに関する情報を与えてくれます。その呼び出しを行うときは、StdoutをInputStreamとして取得します。結果。結果はほとんどの場合膨大な時間になるので、私は だけをメモリ内のすべてのものを収集することができます。

M

答えて

1
  • あなたはjava.ioストリームからSource sおよびSink秒を取得するためにStreamConvertersユーティリティを使用することができます。詳細情報here
  • flatMapConcatまたはflatMapMergeを使用して、Sourceのストリームを1つのストリームにまとめることができます。詳細情報here

簡単な例は次のようになります。

val source: Source[String, NotUsed] = ??? 
    def gimmeInputStream(name: String): InputStream = ??? 
    val processingFlow: Flow[ByteString, ByteString, NotUsed] = ??? 

    source 
    .map(gimmeInputStream) 
    .flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192)) 
    .via(processingFlow) 
    .runWith(Sink.ignore) 

しかしアッカストリームはFileIOオブジェクト内のファイルに対する読み取り/書き込みするために多くの慣用的なDSLを提供しています。詳細情報here

例は次のようになります。

val source: Source[String, NotUsed] = ??? 
    val processingFlow: Flow[ByteString, ByteString, NotUsed] = ??? 

    source 
    .flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name))) 
    .via(processingFlow) 
    .runWith(Sink.ignore) 
関連する問題