2016-06-13 7 views
5

私はファイルのリストを持っています。私は欲しい:ストリーム:複数のファイルを読む

  1. すべてのものを単一のソースとして読む。
  2. ファイルは順番に順番に読み取られる必要があります。 (ラウンドロビンなし)
  3. ファイルが完全にメモリに格納されている必要はありません。
  4. ファイルを読み込む際にエラーが発生すると、ストリームが折りたたまれます。

これが動作しなければならないようにそれは感じた:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

(スカラ、アッカ・ストリームv2.4.7)しかし、それはFileIOは、それに関連付けられたマテリアライズド値を有しているので、コンパイルエラーが発生し、そしてSource.combine doesnのそれをサポートしていません。

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
    .mapMaterializedValue(f => NotUsed.getInstance()) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

しかし、実行時にIllegalArgumentExceptionをスロー:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out] 

答えて

8

異なる懸念事項を明確にモジュール化するために、以下のコードはそれほど簡潔ではありません。

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings 
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String) 

// given as stream of Paths we read those files and count the number of lines 
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right) 

// Here's our test data source (replace paths with real paths) 
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath)) 

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes 
testFiles runWith lineCounter foreach println 
+0

私はモジュラーを探していたので、それを感謝します。私は、ファイルで何かできることの例として行数を使用していました。書かれた 'lineCounter'は、それをファイルの読み込みに合わせています。 (それはシンクです)しかし、私がフォールドとそれ以外のすべてを別の場所に移動すると、私はFlow [Path、String、NotUsed]が残っています。これはまさに私が探していたものです。 – randomstatistic

+0

あなたの例を輸入してください、彼らはコードの不可欠な部分です。 –

+1

@OsskarWerrewkaそれはすべてakka.stream.scaladslとjava IO/NIOになければなりません。あなたはそれに問題がありましたか? –

-1

私は1つの答えを持っています離れマテリアライズド値をマッピング

は、私は、ファイル読み込みエラーの処理を受けるが、コンパイルんどのように思ってしまいますゲートの外 - akka.FileIOを使用しないでください。これは正常に動作しているようです(例:

val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _) 
val source = Source.fromIterator[String](() => sources) 
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 

)さらに良い解決策があるかどうかを知りたいと思います。

+0

'io.Source'を使うことで、多くのパワーを失います。小さなファイルの場合、これはうまくいくかもしれませんが、大きなファイルではオプションではありません。 – jarandaf

+0

@ jarandafあなたは明確にできますか?私はio.Sourceがちょうどフードの下でBufferedReaderを使用していて、getLinesイテレータがファイル全体を一度に読み込んでいないという印象を受けました。 – randomstatistic

+0

あなたは正しいかもしれませんが( 'FileIO'は' String'の代わりに 'ByteString'を扱いますが、これはより演奏するためのものです)。一方、 'io.Source'では、ソースを閉じるために常に注意が必要です(これはデフォルトでは行われません)。 – jarandaf

2

更新私はページを更新していなかったので、ああ、私は<> _受け入れ答えを見ていません。私はエラー処理に関するいくつかの注意を追加したので、これをここに残しておきます。

私は次のプログラムは、あなたが何をしたいん信じる:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, IOResult} 
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source} 
import akka.util.ByteString 
import scala.concurrent.{Await, Future} 
import scala.util.{Failure, Success} 
import scala.util.control.NonFatal 
import java.nio.file.Paths 
import scala.concurrent.duration._ 

object TestMain extends App { 
    implicit val actorSystem = ActorSystem("test") 
    implicit val materializer = ActorMaterializer() 
    implicit def ec = actorSystem.dispatcher 

    val sources = Vector("build.sbt", ".gitignore") 
    .map(Paths.get(_)) 
    .map(p => 
     FileIO.fromPath(p) 
     .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left) 
     .mapMaterializedValue { f => 
      f.onComplete { 
      case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p") 
      case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}") 
      case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e") 
      } 
      NotUsed 
     } 
    ) 
    val finalSource = Source(sources).flatMapConcat(identity) 

    val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 
    result.onComplete { 
    case Success(n) => println(s"Read $n lines total") 
    case Failure(e) => println(s"Reading failed: $e") 
    } 
    Await.ready(result, 10.seconds) 

    actorSystem.terminate() 
} 

ここで重要なのはflatMapConcat()方法であって、それが元にストリームの各要素を変換した場合、これらのソースによって得られた要素のストリームを返します。それらは順番に実行されます。

エラーの処理に関しては、mapMaterializedValue引数でハンドラを将来に追加するか、ハンドラをSink.foreachの未来の値に設定して実行中のストリームの最終エラーを処理することができます。私は上記の例で両方を行い、存在しないファイルでテストすると、同じエラーメッセージが2回出力されることがわかります。残念ながら、flatMapConcat()はマテリアライズされた値を収集しておらず、率直に言って私がそれをうまくやることができないので、必要に応じて別々に処理しなければなりません。

関連する問題