2017-05-15 5 views
0

私は入力ファイルから各行を読み込んで単純なシンクを適用するためのファイルパーサーを作成するために、Akka FileIO(scala)を使用しています。各行は改行( '\ n')文字で区切られていますが、ファイルの最後の行はEOFで終わります。AKKA FileIOストリームの改行とEOFへの変換

最終的な '/ n'文字に依存せずに最終行を確実に読み取れるように、改行と改行の両方を処理するにはどうすればよいですか?

var rowNum = 0 
    val simpleMsgSink: Sink[String, Future[Done]] = 
     Sink.foreach { 
     case row: String => { 
      println(s"$rowNum: $row") 
      rowNum = rowNum+1 
     } 
     } 
    val source = FileIO.fromPath(file, 1 * 1024 * 1024) 
     .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) 
     .map(_.utf8String) 
     .runWith(simpleMsgSink) 

これは、ファイル(最後の行の末尾に改行なし)に対して実行された場合:

Sensor_ID,Location,Seqno,gwrx.time,Temp,Humidity,Noise,CO2,Water 
A0890,"51.645368, 0.072211",1,42793.00278,16,48,36,325,0 
A0891,"51.645370, 0.072300",1,42793.00278,15,41,34,353,3 

出力は次のとおりです。

0: Sensor_ID,Location,Seqno,gwrx.time,Temp,Humidity,Noise,CO2,Water 
1: A0890,"51.645368, 0.072211",1,42793.00278,16,48,36,325,0 

がどのように私はピックアップ最後の行?

+0

感謝。私も2.4.16を実行しています。スニペットは3行の出力を返しますか? 3行目の最後に余分な\ nを追加していないと確信していますか? –

答えて

0

Framing.delimiterscala docを見ると、実際には3番目のパラメータがallowTruncationで、デフォルト値がfalseであることがわかります。ここscaladocはそれについて言っているのです:

false場合、復号化される最後のフレームが有効な区切り文字が含まれていないときに、このフローではなく切り捨てられたフレームを返すストリームを失敗しました。

だから、あなたがしなければならないすべては、不足しているパラメータを追加している:、@chunjefそれをテストするための

Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true) 
+0

それは完璧です!私は既にそれを「間違っている」と判断し、エラーをキャッチしようとしましたが、allowTrunction = trueは私が欲しいと思うだけです。ありがとう! –

関連する問題