2016-10-24 17 views
6

私はcsvファイルを読んでいます。私はAkka Streamsを使ってこれを行い、各行で実行するアクションのグラフを作成できるようにしています。私は次のおもちゃの例を持って走っています。Akkaストリームを使用したCSVファイルの読み込み

def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("MyAkkaSystem") 
    implicit val materializer = ActorMaterializer() 

     val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines) 
     val sink = Sink.foreach(println) 
     source.runWith(sink) 
     } 

2つのSourceタイプは私と一緒に楽に座りません。これは慣用か、これを書く良い方法がありますか?

答えて

2

これは違うのでうん、いいですよ。Source sです。しかし、あなたは(時には我々は例えばソースcsvファイルはzip形式で行う必要がある)自分でファイルを読み込んでとApache Commons CSVを使用することを検討していると述べた特定のInputStream

このような
StreamConverters.fromInputStream(() => input) 
    .via(Framing.delimiter(ByteString("\n"), 4096)) 
    .map(_.utf8String) 
    .collect { line => 
    line 
    } 

を使用してそれを解析することができますscala.io.Source好きではない場合アクアストリーム。あなたはコードを書くことが少なくなるかもしれません:)

+0

こんにちは、私はこの例を数分間実行しようとしていましたが、私は成功しませんでした。不足している輸入品やセットアップを提供してください。 –

10

実際には、akka-streamsはファイルから直接読み込む機能を提供します。

FileIO.fromPath(Paths.get("a.csv")) 
     .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) 
     .runForeach(println) 

ここで、runForeachの方法は、行を印刷することです。これらの行を処理するために適切なSinkがある場合は、この関数の代わりに使用してください。たとえば、あなたが'で行を分割し、その中の単語の総数を印刷する場合:アッカストリームでCSVファイルを読み込むための

val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size)) 

FileIO.fromPath(Paths.get("a.csv")) 
     .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) 
     .to(sink) 
     .run() 
+0

ええ、先日私はこれを見てきました。私はおそらくそれを使用します。私はPureCSVライブラリを使用しようとしましたが、ストリームベースのアプローチを使用する目的を敗北させる処理の前にメモリ内のすべてのファイルを読み込みます。 –

4

慣用的な方法は、Alpakka CSV connectorを使用することです。次の例では、CSVファイルを読み込む列名(ファイルの最初の行とする)とByteString値のマップに変換し、String値にByteString値を変換し、各ラインを印刷:

FileIO.fromPath(Paths.get("a.csv")) 
    .via(CsvParsing.lineScanner()) 
    .via(CsvToMap.toMap()) 
    .map(_.mapValues(_.utf8String)) 
    .runForeach(println) 
関連する問題