非常に大きなDAGを構築して、分散スケジューラに提出します。ノードは、非常に大きなデータフレームで動作します。 1つのパターンは、データをロードし、それぞれ数百MBの(そして論理的に1つのテーブルのパーティションを表す)パンダのデータフレームを構築する約50〜60の関数を持っていることです。データの移動を最小限に抑えながら、これらをグラフ内の下流ノードの単一のdaskデータフレームに連結したいと思います。ダークグラフの実行とメモリの使用
dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs]
return dask.delayed(concat_all)(dfs)
どこ
def pandas_to_dask(df):
return dask.dataframe.from_pandas(df).to_delayed()
と私は様々なconcat_all
implentationsを試してみましたが、これは合理的なようだ:
def concat_all(dfs):
dfs = [dask.dataframe.from_delayed(df) for df in dfs]
return dask.dataframe.multi.concat(dfs, axis='index', join='inner')
すべてのパンダのデータフレームは、その上の互いに素である私は、このようなタスクをリンクインデックスとソート/単調性。
しかし、私はこのconcat_all
の機能で死に至った労働者を殺しています(それぞれのメモリ予算を超えると、クラスタマネージャはそれらを殺しています)。実際にはメモリの予算はかなりですが、動くとは思っていませんデータの周り。私は合理的に、私は常にデータの合理的なサブセットにスライスしてから、compute()
を呼び出す前にdaskデータフレームを使用していることを確信しています。
--memory-limit
でこれまで成功していません。私は問題に少なくとも正しくアプローチしていますか?私は行方不明の問題はありますか?パンダのデータフレーム
>>> dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
>>> type(dfs[0].compute()) # just checking that this is true
pandas.DataFrame
に計算する遅延値のリストを考えると
お返事ありがとうございます。私はdd.from_delayed(dfs)がすぐにメタデータを抽出するために 'dfs [0]'を評価することを観察します。何らかの理由で、これが私に問題を引き起こしています。グラフが完全に構築されるまでこの評価を延期する別の方法がありますか?私はレプロをまとめようとします。 –
'meta ='キーワードにデータフレームの例を与えることができます。私は答えの例を追加します。 – MRocklin