2016-09-01 13 views
0

Answer this question Odomontoisは、あらかじめソートされたストリームをキーでグループ化できるレイジーグループバイ演算子を、メモリにすべて格納することなくグループ化する方法を示しました。 Akkaのストリーム(つまりソースオブジェクト)でこれを行う方法はありますか?代わりに、Akmaソースから通常のStreamオブジェクトを引き出す方法があるので、私はOdomontoisのchopByを使用できますか?Akka StreamsでchopByを実装する

は、ここでは動作しません。これを行うには全く失敗します:

implicit class SourceChopOps[T, NU](s: Source[T, NU]) { 
    def chopBy[U](f: T => U) = { 
     s.prefixAndTail(1) 
     .map(pt => (pt._1.head, pt._2)) 
     .map { 
      case (prefix, tail) => 
      // what to do with pulled off head??? 
      tail.takeWhile(e => f(e) == f(prefix)) ++ tail.dropWhile(e => f(e) == f(prefix)).chopBy(f) // fails here 
     } 
     } 
    } 
    } 
+0

あなたは公式の文書をチェックしましたか? http://doc.akka.io/docs/akka/2.4.9/scala/stream/stream-cookbook.html#implementing-reduce-by-key – fGo

+0

info @fGoありがとうございます。 Akkaグループは、ほとんどの中間データをメモリに保持する必要性をどうにかして回避しますか?返す前にすべてのサブストリームのデータを保持する必要がありますか?それとも、本当にきちんとしたフロー制御トリックでこれをやる必要がないのですか?この前の状態は、chopByの背後にある主な推進力でした。一度に1つのキーだけメモリ内のデータを保持するだけです。 – ChoppyTheLumberjack

答えて

0

groupByをアッカストリームにあなたがメモリにすることにより、グループ化されているキーを保持しますが、としてストリーム面積常に「怠惰」彼らは背圧を持っているので、限られたメモリで実行されます。下流が新しい要素を受け入れない場合、新しい要素は上流に生成されません。

したがって、たとえば:

case class Record(id: Int) 
Source.fromIterator(() => 
    Iterator 
     .fill(1000)(Iterator(1,2).map { n => println("creating"); Record(n) }) 
     .flatten) 
    .groupBy(maxSubstreams = 2, _.id) 
    .map { r => println("Consuming"); r } 
    .fold(0)((acc, _) => acc + 1) 
    .mergeSubstreams 
    .runForeach(println) 

Recordインスタンスが早く、彼らは2つのサブではなく、先行それらのすべてのそれぞれで消費することができようにして製造されている方法を紹介します。

関連する問題