2017-11-30 11 views
2

よく記載されているように、Daskはreset_indexが呼び出されたときにパーティションごとに厳密に増加するインデックスを作成し、セット全体でインデックスが重複します。 Daskで厳密に増加するインデックスを作成する最も良い方法(例えば、計算上最も速い)は、セット全体にわたって連続している必要はありません。私はmap_partitionsがパーティション番号を渡すことを望んでいたが、私はそうは思わない。ありがとう。Dask:厳密に増加するインデックスを作成する

EDIT

おかげ@MRocklinは、私がここまで持っているが、私は、元のデータフレームと私のシリーズを再結合する方法について少し援助を必要とします。

def create_increasing_index(ddf:dd.DataFrame): 
    mps = int(len(ddf)/ddf.npartitions + 1000) 
    values = ddf.index.values 

    def do(x, max_partition_size, block_id=None): 
     length = len(x) 
     if length == 0: 
      raise ValueError("Does not work with empty partitions. Consider using dask.repartition.") 

     start = block_id[0] * max_partition_size 
     return da.arange(start, start+length, chunks=1) 

    series = values.map_blocks(do, max_partition_size=mps, dtype=np.int64) 
    ddf2 = dd.concat([ddf, dd.from_array(series)], axis=1) 
    return ddf2 

私はエラー「とValueError:不明部門の指定軸= 1でデータフレームを連結することができません」を取得しています。 dd.concatを使うよりも良い方法がありますか?ありがとう。 AGAIN

EDIT

実は、私の目的(と私がテストしていたデータの量 - わずか数ギガバイト)のためCUMSUMが十分に速いです。これが遅すぎるときは私は再訪するでしょう!これを達成する

答えて

2

かなり遅いの方法は、新しい列を作成し、これはどちらも非常に遅いことも、それは自由であるcumsum

ddf['x'] = 1 
ddf['x'] = ddf.x.cumsum() 
ddf = ddf.set_index('x', sorted=True) 

を使用することです。

あなたの質問がどのように表現されているかを考えてみると、最大の行数よりも大きいことがわかっている非常に大きな値で区切られた各パーティションの範囲を作成するだけです。 map_partitionsはパーティション番号を提供していません。代わりに以下の2つのソリューションのいずれかを実行できます。

  1. 、(.valuesで)dask.arrayに変換ブロックインデックスを提供しmap_blocks方法を使用し、次いで、dd.from_arrayバック系列に変換します。
  2. 遅れシリーズを自分で作成、dask.delayedオブジェクトのリストに変換し、その後、dd.from_delayed

http://dask.pydata.org/en/latest/delayed-collections.html

でDASKシリーズに戻って変換します
関連する問題