2017-05-19 13 views
3

daskを使用して、結果をリレーショナルデータベースに保存する5000個のバッチタスクを処理したい場合は、すべて完了した後、 databseを照会し、(AWS S3に保存されます)結果ファイルを生成しますDASK - 実行中にワーカーを停止すると、完了したタスクが2回起動されます。

だから、多かれ少なかれ、このようなものだ:

from dask import bag, delayed batches = bag.from_sequence(my_batches()) results = batches.map(process_batch_and_store_results_in_database) graph = delayed(read_database_and_store_bundled_result_into_s3)(results) client = Client('the_scheduler:8786') client.compute(graph)

そして、これは動作しますが、:処理の終わり近く多くの作業者がアイドル状態にあり、AWS EC2をオフにすることができます(AWS EC2でお金を節約できます)。しかし、スケジューラは、それらのタスクがすでに完了していることを「忘れて」残りの作業者に対してそれらのタスクを再度実行しようとします。

read_database_and_store_bundled_result_into_s3を開始する前にDaskがすべての結果を追跡しようとしているので、これは実際には機能であり、バグではないことを理解していますが、分散処理グラフをオーケストレーションするようdaskに指示する方法はありますか国家管理について心配しないでください。

+1

この問題は、何らかの形でこれに関連すると思われる:https://github.com/dask/distributed/issues/847も関連 –

+0

:http://stackoverflow.com/questions/41965253/repeated-task-execution -used-the-distributed-dask-scheduler/41965766#41965766 –

答えて

1

完了後に未来を忘れることをお勧めします。このソリューションは、dask.bagではなくdask.distributed concurrent.futuresインタフェースを使用します。特に、イテレータはas_completedです。

from dask.distributed import Client, as_completed 
client = Client('the_scheduler:8786') 

futures = client.map(process_batch_and_store_results_in_database, my_batches()) 

seq = as_completed(futures) 
del futures # now only reference to the futures is within seq 

for future in seq: 
    pass # let future be garbage collected 
関連する問題