2017-06-06 4 views
2

非常に大きなDAGを構築して、分散スケジューラに提出します。ノードは、非常に大きなデータフレームで動作します。 1つのパターンは、データをロードし、それぞれ数百MBの(そして論理的に1つのテーブルのパーティションを表す)パンダのデータフレームを構築する約50〜60の関数を持っていることです。データの移動を最小限に抑えながら、これらをグラフ内の下流ノードの単一のdaskデータフレームに連結したいと思います。ダークグラフの実行とメモリの使用

dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs] 
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs] 
return dask.delayed(concat_all)(dfs) 

どこ

def pandas_to_dask(df): 
    return dask.dataframe.from_pandas(df).to_delayed() 

と私は様々なconcat_all implentationsを試してみましたが、これは合理的なようだ:

def concat_all(dfs): 
    dfs = [dask.dataframe.from_delayed(df) for df in dfs] 
    return dask.dataframe.multi.concat(dfs, axis='index', join='inner') 

すべてのパンダのデータフレームは、その上の互いに素である私は、このようなタスクをリンクインデックスとソート/単調性。

しかし、私はこのconcat_allの機能で死に至った労働者を殺しています(それぞれのメモリ予算を超えると、クラスタマネージャはそれらを殺しています)。実際にはメモリの予算はかなりですが、動くとは思っていませんデータの周り。私は合理的に、私は常にデータの合理的なサブセットにスライスしてから、compute()を呼び出す前にdaskデータフレームを使用していることを確信しています。

--memory-limitでこれまで成功していません。私は問題に少なくとも正しくアプローチしていますか?私は行方不明の問題はありますか?パンダのデータフレーム

>>> dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs] 
>>> type(dfs[0].compute()) # just checking that this is true 
pandas.DataFrame 

に計算する遅延値のリストを考えると

答えて

2

が、これは、メタデータ(列を決定するために、第1の演算を実行しますデフォルトではdask.dataframe.from_delayed機能に

>>> ddf = dd.from_delayed(dfs) 

を渡しますdask.dataframeにとって重要な名前、dtypeなど)。これを回避するには、サンプルのデータフレームを作成し、meta=キーワードに渡す。

>>> meta = pd.DataFrame({'value': [1.0], 'name': ['foo'], 'id': [0]}) 
>>> ddf = dd.from_delayed(dfs, meta=meta) 

example notebookも役立ちます。

通常、(from_pandasコールを遅らせることによって)他のdask関数からdask関数を呼び出す必要はありません。 Dask.dataframe関数自体はすでに怠惰であり、さらに遅延させる必要はありません。

+0

お返事ありがとうございます。私はdd.from_delayed(dfs)がすぐにメタデータを抽出するために 'dfs [0]'を評価することを観察します。何らかの理由で、これが私に問題を引き起こしています。グラフが完全に構築されるまでこの評価を延期する別の方法がありますか?私はレプロをまとめようとします。 –

+0

'meta ='キーワードにデータフレームの例を与えることができます。私は答えの例を追加します。 – MRocklin

関連する問題