2017-07-25 9 views
2

私はパーティション化されたコレクションの内容を連想ではない集計関数と順番に集約したいので、Bag.foldまたはBag.reductionを使用することはできません。daskバッグの内容を順番に集める方法は?

あり、この操作を行うように見えるBag.accumulateですが、それはだけでなく、最終的な集合体の一部のパーティションごとの中間結果でバッグを返します。

>>> import dask.bag as db 
>>> 
>>> def collect(acc, e): 
...  if acc is None: 
...   acc = list() 
...  acc.append(e) 
...  return acc 
... 
>>> b = db.from_sequence(range(10), npartitions=3) 
>>> b.accumulate(collect, initial=None).compute() 
[None, 
[0, 1, 2, 3], 
[0, 1, 2, 3], 
[0, 1, 2, 3], 
[0, 1, 2, 3], 
[0, 1, 2, 3, 4, 5, 6, 7], 
[0, 1, 2, 3, 4, 5, 6, 7], 
[0, 1, 2, 3, 4, 5, 6, 7], 
[0, 1, 2, 3, 4, 5, 6, 7], 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]] 

は基本的に私はの最後の要素でのみ興味がありますaccumulateの出力と私は中間ステップのコピーをメモリに保存したくありません。

答えて

3

現在のところ、バッグには順次縮小操作はありませんが、それは可能です。これを実現する簡単な方法は、上記のようにaccumulateを使用することですが、最後のパーティションの最後の要素のみを要求します。私たちは、確かに私は `to_delayed`(または単に最初からループで` delayed`呼び出しですべてのものを書くこと)について考えたが、私は思っていたBag.to_delayed

acc = b.accumulate(collect, initial=None) 
partitions = acc.to_delayed() 
partitions[-1][-1].compute() 
+0

を使用して遅延値に袋を変換することによって、比較的容易にこれを行うことができますもしそれを行うためのより多くの "控え目な"方法がなければ。 – ogrisel

+0

このような場合には、実際には 'Bag.reduce'メソッドが必要です。一般的には、dask.delayedに切り替えると、 "dask-ish"のようになります。ユーザーが遭遇するすべての状況を予測できるツールはありません。 "use dask.delayed"はおそらく、#dask-labeled stack overflowの約30%に対する答えです。 – MRocklin

関連する問題