私はファイルのリストを持っています。私は欲しい:ストリーム:複数のファイルを読む
- すべてのものを単一のソースとして読む。
- ファイルは順番に順番に読み取られる必要があります。 (ラウンドロビンなし)
- ファイルが完全にメモリに格納されている必要はありません。
- ファイルを読み込む際にエラーが発生すると、ストリームが折りたたまれます。
これが動作しなければならないようにそれは感じた:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
(スカラ、アッカ・ストリームv2.4.7)しかし、それはFileIO
は、それに関連付けられたマテリアライズド値を有しているので、コンパイルエラーが発生し、そしてSource.combine
doesnのそれをサポートしていません。
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
しかし、実行時にIllegalArgumentExceptionをスロー:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
私はモジュラーを探していたので、それを感謝します。私は、ファイルで何かできることの例として行数を使用していました。書かれた 'lineCounter'は、それをファイルの読み込みに合わせています。 (それはシンクです)しかし、私がフォールドとそれ以外のすべてを別の場所に移動すると、私はFlow [Path、String、NotUsed]が残っています。これはまさに私が探していたものです。 – randomstatistic
あなたの例を輸入してください、彼らはコードの不可欠な部分です。 –
@OsskarWerrewkaそれはすべてakka.stream.scaladslとjava IO/NIOになければなりません。あなたはそれに問題がありましたか? –