2016-11-30 8 views
1

私は怠惰なイテレータ[アイテム]を持っているとしましょう。 itemsオブジェクトは、イテレータを反復処理するときにのみ遅延生成されます。アイテムは高価です。Scala:遅延イテレータのコンテンツを「プリロードする」方法?

このイテレータをJSON配列としてシリアル化したいと思います。それは(Jacksonのscalaモジュールで)動作しますが、それは私には十分効率的ではないようです。

私の知る限り理解し、それは現在このように動作します:

  • 計算し、次の項目
  • のSerialize項目
  • 計算し、次の項目
  • のSerialize項目
  • 計算し、次の項目
  • シリアル化アイテム

私はアイテムの計算とアイテムのシリアライズを並行して行いたいと思います。

次のアイテムを読むとき、次のアイテムの定義された量の計算を開始するイテレータが必要です。

たとえば、シーンの背後でiterator.next()を実行すると、反復スレッドがブロックされずに次の50個のアイテムが計算されます(次の要素が利用可能になるまで待ちます)。

私は「BufferedIterator」を見てきましたが、私は本当に、明示的に「ヘッド」を照会したくないとして、それは、私が必要と正確に何ではない、と私は

どれをプリロードするために1つの以上のアイテムを必要としますこれがどのように達成されるかについてのアイデア?

私もストリームでIteratorを置き換える解決するためにOKですが、私はあなたの問題の権利を理解している場合が低いため、メモリ使用量

+0

[GroupedIterator](http://scala-lang.org/api/2.11.8/#scala.collection.Iterator$GroupedIterator)を試しましたか? – laughedelic

+0

@laughedelic私のアイテムが塊で計算されるように、すでに 'inputIterator.grouped(chunkSize).map(computeItemsChunk).flatten'を使用していますが、あなたが提案していることについてはわかりません。 –

+0

" .next()、シーンの後ろに、次の50項目が計算されます。 'GroupedIterator'ではこれが起こります:' .next'を呼び出すたびに、新しいチャンクが計算されます。あなたが望むものではありませんか? – laughedelic

答えて

1

にイテレータのための好みを持って、ここにあなたが何ができるかの例を示します。各アイテムの計算をFutureにラップすることができるので、準備ができたら各チャンクをブロックして処理/シリアル化せずに入力ストリームを繰り返し処理できます。それぞれの作品が評価されるとき、各事が起こっているとき、あなたが見ることができますので、私は、REPLと印刷でこれをするつもりです:

@ import concurrent._, ExecutionContext.Implicits.global 
import concurrent._, ExecutionContext.Implicits.global 

@ def futureItem(i: Int): Future[Int] = Future { 
    Thread.sleep(1000) 
    println(s"item: ${i}") 
    i 
} 
defined function futureItem 

@ val inputIterator = (1 to 9).toIterator.map(futureItem) 
inputIterator: Iterator[Future[Int]] = non-empty iterator 

だから各項目を計算することは、少なくとも1秒かかります。 (私は、オブジェクトの中に定義された

@ case object foo { 
    val chunksIterator = inputIterator.grouped(3).map { futureItems => 
    Future.sequence(futureItems).map(computeItemsChunk) 
    } 
} 
defined object foo 

:、今、私たちのグループの入力ストリーム

@ def computeItemsChunk(items: Seq[Int]): Int = { 
    Thread.sleep(1000) 
    val s = items.sum 
    println(s"chunk ${items}: ${s}") 
    s 
} 
defined function computeItemsChunk 

Future.sequenceを適用し、チャンクを計算します。そして今、我々はまた、いくつかの時間がかかるチャンク、内の項目を処理したいですそうでなければ、グループ化(または何か他のもの)は最初のチャンクの評価を強制するでしょう)。今度は、それが評価される方法を見てみましょう:

@ Await.result(Future.sequence(foo.chunksIterator), Duration.Inf) 
item: 2 
item: 3 
item: 4 
item: 1 
item: 7 
item: 6 
chunk List(1, 2, 3): 6 
item: 5 
item: 8 
chunk List(4, 5, 6): 15 
item: 9 
chunk List(7, 8, 9): 24 
res5: Iterator[Int] = non-empty iterator 

あなたはアイテムが利用可能であり、反復子は、各チャンクの評価を待たずに進み、一旦チャンクが計算されていることがわかります。

+0

あなたのコードは、私がすでに使っているものです(Iterator [Iterator [Item]]を最後に平坦化したものを除きますが、このような実装では問題がありました。あまりにも多くの並列作業を行い、システムをオーバーフローさせますので、一度に1つのチャンクを並行して計算し、最大で 'chunkSize'並列計算があるようにします。 –

+0

私はアイテムを合計のような結果に折りたたむ必要はないことに注意してください( 'computeItemsChunk'は' Seq [Id] 'をとり戻り値を返す必要があります) 'Seq [Item]'など)。すべての計算が開始された場合、私はIteratorまたはStreamの遅延プロパティーが本当に必要なくなったと思われますが、Future [List [Item ]] '結果として十分かもしれない –

+0

だから一般的な考え方はすべての計算を一度に開始したいとは思っていません。反復子のコンシューマが進行するとき(つまり、すべてを一度にロードせず、すべてのアイテムではなく次のイテレータの次のアイテムのみをロードするとき)にのみ計算がトリガされます。 –