2016-09-26 4 views
2

再生フレームワークIterateeを使用してファイルを読み取ります。私はチャンクでこのファイルチャンクを処理したいと思います(各ステップごとに)。Play Iterateを使用してプロセスの各ステップでチャンクでファイルチャンクを読み込んで処理する方法

私は、次の手順作曲:groupByLinesを定義するには

  • groupByLines: Enumeratee[Array[Byte], List[String]]
  • turnIntoLines: Enumeratee[List[String], List[Line]](私はcase class Line(number: Int, value: String)を定義した)
  • parseChunk: Iteratee[List[Line], Try[List[T]]](例えばCSV解析)

を、私が使用する必要がありますIteratee.fold前のチャンクの最後のラインを現在のチャンクの最初のものと連結する。

問題は、これがファイルのすべての行を含む単一のチャンクを作成することです。

しかし、チャンクでファイルチャンクを処理したいと思います。つまり、groupByLinesは200行のチャンクを生成するはずです(たとえば)。

turnIntoLineと同じ問題が発生します。また、foldを使用してラインを作成します。私はライン番号とラインの内容を圧縮するためにアキュムレータ(foldで提供)を使用する必要があります。

私はプレイiterateeとbegginnerです。ここで

は私のコードです:

val chunkSize = 1024 * 8 

val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize) 

def isLastChunk(chunk: Array[Byte]): Boolean = { 
    chunk.length < chunkSize 
} 

val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped { 
    println("groupByLines") 
    Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) { 
    case ((accLast, accLines), chunk) => 
     println("groupByLines chunk size " + chunk.length) 
     new String(chunk) 
     .trim 
     .split("\n") 
     .toList match { 
     case lines @ Cons(h, tail) => 
      val lineBetween2Chunks: String = accLast + h 

      val goodLines = 
      isLastChunk(chunk) match { 
       case true => Cons(lineBetween2Chunks, tail) 
       case false => Cons(lineBetween2Chunks, tail).init 
      } 

      (lines.last, accLines ++ goodLines) 
     case Nil => ("", accLines) 
     } 
    }.map(_._2) 
} 


val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped { 
    println("turnIntoLines") 
    Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) { 
    case ((index, accLines), chunk) => 
     println("turnIntoLines chunk size " + chunk.length) 
     val lines = 
     ((Stream from index) zip chunk).map { 
      case (lineNumber, content) => Line(lineNumber, content) 
     }.toList 
     (index + chunk.length, lines ++ accLines) 
    }.map(_._2) 
} 

答えて

0

ここでの問題は、プレイIterateesを使用して行毎にファイルをどのように処理するか、です。

まず、に、UTF-8使用してファイルを読み取るIを用いる:

object EnumeratorAdditionalOperators { 
    implicit def enumeratorAdditionalOperators(e: Enumerator.type): EnumeratorAdditionalOperators = new EnumeratorAdditionalOperators(e) 
} 

class EnumeratorAdditionalOperators(e: Enumerator.type) { 

    def fromUTF8File(file: File, chunkSize: Int = 1024 * 8): Enumerator[String] = 
    e.fromFile(file, chunkSize) 
     .map(bytes => new String(bytes, Charset.forName("UTF-8"))) 

} 

そして、('\n'でカット)行に分割入力チャンクに:

object EnumerateeAdditionalOperators { 
    implicit def enumerateeAdditionalOperators(e: Enumeratee.type): EnumerateeAdditionalOperators = new EnumerateeAdditionalOperators(e) 
} 

class EnumerateeAdditionalOperators(e: Enumeratee.type) { 

    def splitToLines: Enumeratee[String, String] = e.grouped(
    Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
     Iteratee.consume() 
) 

} 

第三に、 に行番号を追加すると、ここに見つかったコードがhttps://github.com/michaelahlers/michaelahlers-playful/blob/master/src/main/scala/ahlers/michael/playful/iteratee/EnumerateeFactoryOps.scalaになりました。私はEnumeratorEnumerateeにメソッドを「追加」する暗黙の定義された

class EnumerateeAdditionalOperators(e: Enumeratee.type) { 

    /** 
    * As a complement to [[play.api.libs.iteratee.Enumeratee.heading]] and [[play.api.libs.iteratee.Enumeratee.trailing]], allows for inclusion of arbitrary elements between those from the producer. 
    */ 
    def joining[E](separators: Enumerator[E])(implicit ec: ExecutionContext): Enumeratee[E, E] = 
    zipWithIndex[E] compose Enumeratee.mapInputFlatten[(E, Int)] { 

     case Input.Empty => 
     Enumerator.enumInput[E](Input.Empty) 

     case Input.El((element, index)) if 0 < index => 
     separators andThen Enumerator(element) 

     case Input.El((element, _)) => 
     Enumerator(element) 

     case Input.EOF => 
     Enumerator.enumInput[E](Input.EOF) 

    } 

    /** 
    * Zips elements with an index of the given [[scala.math.Numeric]] type, stepped by the given function. 
    * 
    * (Special thanks to [[https://github.com/eecolor EECOLOR]] for inspiring this factory with his answer to [[https://stackoverflow.com/a/27589990/700420 a question about enumeratees on Stack Overflow]].) 
    */ 
    def zipWithIndex[E, I](first: I, step: I => I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = 
    e.scanLeft[E](null.asInstanceOf[E] -> ev.minus(first, step(ev.zero))) { 
     case ((_, index), value) => 
     value -> step(index) 
    } 

    /** 
    * Zips elements with an incrementing index of the given [[scala.math.Numeric]] type, adding one each time. 
    */ 
    def zipWithIndex[E, I](first: I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = zipWithIndex(first, ev.plus(_, ev.one)) 

    /** 
    * Zips elements with an incrementing index by the same contract [[scala.collection.GenIterableLike#zipWithIndex zipWithIndex]]. 
    */ 
    def zipWithIndex[E]: Enumeratee[E, (E, Int)] = zipWithIndex(0) 

    // ... 

} 

注意。このトリックでは、たとえば、次のように書くことができます。Enumerator.fromUTF8File(file)。すべて一緒に入れて

case class Line(number: Int, value: String) 


Enumerator.fromUTF8File(file) &> 
Enumeratee.splitToLines ><> 
Enumeratee.zipWithIndex ><> Enumeratee.map{ 
    case (e, idx) => Line(idx, e) 
} // then an Iteratee or another Enumeratee 

新しいコードは、はるかに簡単かつ簡潔な質問に与えられたものよりもです。

関連する問題