dask.compute(...)はブロッキング呼び出しであると予想されます。しかし、dask.computeをネストし、内側のものが(dask.dataframe.read_parquetのように)I/Oを実行すると、内部のdask.computeはブロックされません。私はのように、それぞれ8つのプロセスと2人の労働者を開始した場合ネストされたdask.computeはブロックしません。
import dask, distributed
def outer_func(name):
files = find_files_for_name(name)
df = inner_func(files).compute()
# do work with df
return result
def inner_func(files):
tasks = [ dask.dataframe.read_parquet(f) for f in files ]
tasks = dask.dataframe.concat(tasks)
return tasks
client = distributed.Client(scheduler_file=...)
results = dask.compute([ dask.delay(outer_func)(name) for name in names ])
:ここでは擬似コードの例です
dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1
、その後、私が実行して、最大で2×8同時inner_funcを期待inner_func(ファイル)ので、 .compute()がブロックされているはずです。しかし、私が観察したことは、あるworkerプロセス内で、read_parquetステップを開始するとすぐに、別のinner_func(files).compute()が実行される可能性があるということでした。したがって、最終的に複数のinner_func(ファイル).compute()が実行され、いつかはメモリ不足のエラーが発生する可能性があります。
これが期待どおりの動作ですか?もしそうなら、ワーカープロセスごとに1つのinner_func(files).compute()を実行する方法はありますか?
ここに少し混乱があるようです。 dask.dataframeは遅延オブジェクトを作成しますが、遅延/計算された関数内でこれらを作成/計算するのは正常ではありません。この関数がワーカーに送信されているとします。計算はどこで実行されると思いますか? – mdurant
この例の入れ子は、実世界のデータフローIMHOでは非常に典型的です。このようなネストを避けるため、dask DataFrameのような分散データ構造を扱うことは、実際には実現可能/望ましいことではありません。 dask DataFrame APIはpandasよりも小さく、動作するシリアルコードバージョンを維持することが非常に重要であるためです。 inner_funcは、dask-workerプロセス内で複数のスレッドで実行されているようですが、例えばworkerごとに1つのスレッドしか指定しません。 dask-worker --scheduler-file sched.json --nprocs 3 - -nthreads 1 --local-directory/tmp / – user1527390