2017-03-09 8 views
2

Reading a CSV files using Akka Streams - この質問に基づいています。 私は、Akka Streamsを使用してCSVを読み込んでいます。 これを行単位で処理する必要がありますが、ヘッダーの名前がわかっている必要があります。 オプションはありますか?Akkaストリーム:ヘッダ付きのCSV処理

UPD。 ビットを明確にする。この場合には、ヘッダーを - -

FileIO.fromPath(Paths.get("a.csv)) 
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) 
.runForeach(println /* header + current line for each line*/) 
+0

は、ヘッダはになりませんあなたのCSVの最初の行ですか?あなたが直面している具体的な問題の詳細を追加できますか? –

+0

私は必要なサンプルを追加しました。 – Rumoku

答えて

6

あなたは最初の要素を取るためにprefixAndTailを使用することができ、その後、後続の要素(行)となまけそれを組み合わせます。ここで

Map[String, String]を形成するために結合ヘッダと列の例です:

val flow: Flow[Seq[String], Map[String, String], NotUsed] = Flow[Seq[String]] 
    .prefixAndTail(1).flatMapConcat { case (headers, rows) => 
    rows.map (row => headers.head.zip(row).toMap) 
    } 

val test: Source[Seq[String], NotUsed] = Source(
    List(Seq("col1", "col2"), Seq("a", "b"), Seq("1", "2"))) 

Await.result(test.via(flow).runForeach(println), 20.seconds) 
// Map(col1 -> a, col2 -> b) 
// Map(col1 -> 1, col2 -> 2) 
1

Alpakka、アッカストリームコネクタのコレクション、CSV supportを提供しています。

Source 
    .single(ByteString("""header1,header2,header3 
         |1,2,3 
         |4,5,6""".stripMargin)) 
    .via(CsvParsing.lineScanner()) 
    .via(CsvToMap.toMap()) 
    .map(_.mapValues(_.utf8String)) 
    .runForeach(println) 

// Map(header1 -> 1, header2 -> 2, header3 -> 3) 
// Map(header1 -> 4, header2 -> 5, header3 -> 6) 
+0

このライブラリは素晴らしいです。 – flybonzai

関連する問題