再生フレームワークIterateeを使用してファイルを読み取ります。私はチャンクでこのファイルチャンクを処理したいと思います(各ステップごとに)。Play Iterateを使用してプロセスの各ステップでチャンクでファイルチャンクを読み込んで処理する方法
私は、次の手順作曲:groupByLines
を定義するには
groupByLines: Enumeratee[Array[Byte], List[String]]
turnIntoLines: Enumeratee[List[String], List[Line]]
(私はcase class Line(number: Int, value: String)
を定義した)parseChunk: Iteratee[List[Line], Try[List[T]]]
(例えばCSV解析)
を、私が使用する必要がありますIteratee.fold
前のチャンクの最後のラインを現在のチャンクの最初のものと連結する。
問題は、これがファイルのすべての行を含む単一のチャンクを作成することです。
しかし、チャンクでファイルチャンクを処理したいと思います。つまり、groupByLines
は200行のチャンクを生成するはずです(たとえば)。
turnIntoLine
と同じ問題が発生します。また、fold
を使用してラインを作成します。私はライン番号とラインの内容を圧縮するためにアキュムレータ(fold
で提供)を使用する必要があります。
私はプレイiterateeとbegginnerです。ここで
は私のコードです:
val chunkSize = 1024 * 8
val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize)
def isLastChunk(chunk: Array[Byte]): Boolean = {
chunk.length < chunkSize
}
val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped {
println("groupByLines")
Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) {
case ((accLast, accLines), chunk) =>
println("groupByLines chunk size " + chunk.length)
new String(chunk)
.trim
.split("\n")
.toList match {
case lines @ Cons(h, tail) =>
val lineBetween2Chunks: String = accLast + h
val goodLines =
isLastChunk(chunk) match {
case true => Cons(lineBetween2Chunks, tail)
case false => Cons(lineBetween2Chunks, tail).init
}
(lines.last, accLines ++ goodLines)
case Nil => ("", accLines)
}
}.map(_._2)
}
val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped {
println("turnIntoLines")
Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) {
case ((index, accLines), chunk) =>
println("turnIntoLines chunk size " + chunk.length)
val lines =
((Stream from index) zip chunk).map {
case (lineNumber, content) => Line(lineNumber, content)
}.toList
(index + chunk.length, lines ++ accLines)
}.map(_._2)
}