ThreadPoolExecutorを使用して高価なジョブを作成するPythonコードがいくつかあります。どのシステムが完了したのかを把握したいので、このシステムを再起動する必要がある、私はすでに完了したものをやり直す必要はありません。シングルスレッドのコンテキストでは、私が行ったことを棚に記しておくことができます。マルチスレッド環境でのこのアイデアのナイーブなポートは次のとおりです。PythonでThreadPoolExecutorのコンテキストで軽い永続性
from concurrent.futures import ThreadPoolExecutor
import subprocess
import shelve
def do_thing(done, x):
# Don't let the command run in the background; we want to be able to tell when it's done
_ = subprocess.check_output(["some_expensive_command", x])
done[x] = True
futs = []
with shelve.open("done") as done:
with ThreadPoolExecutor(max_workers=18) as executor:
for x in things_to_do:
if done.get(x, False):
continue
futs.append(executor.submit(do_thing, done, x))
# Can't run `done[x] = True` here--have to wait until do_thing finishes
for future in futs:
future.result()
# Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
# before we get through all of things_to_do
私はこれをどうして取り除けますか? documentation for shelveにはスレッドの安全性に関する保証が含まれていないので、私は考えません。
これを処理する簡単な方法は何ですか?おそらくdone[x] = True
をfuture.add_done_callback
に貼ると思いますが、that will often run in the same thread as the future itselfです。おそらく、ThreadPoolExecutorでうまく動作するロック機構がありますか?それは、眠っているループを書いて、完成した先物をチェックすることが私にとってはよりクリーンなようです。
コンテキストマネージャの '__exit__'は、例えばKeyboardInterrupt例外の場合でも呼び出されるため、私の状態は永続化されるようです。スレッドの安全性については、GILのためにすべてのPythonオブジェクトがスレッドセーフであると言っていますか? – kuzzooroo
すべてのPythonオブジェクトはスレッドセーフですが、(少なくともCPythonでは)I/Oを含まない基本的なアトミック操作(代入/再割り当て)を使用しています(writeは '__exit__'の後に行われます)。あなたは安全です。 – Julien
私はあなたのコードが少し間違っていることを追加したいと思っています。あなたは 'do_thing'を呼び出すのではなく、最初の引数として渡すべきです。また、 'executor.submit'の戻り値を(通常は' futs'と呼ばれる)リストに格納する必要があります。次に、 'ThreadPoolExecutor'コンテキスト内で、各オブジェクトの' result() 'メソッドを呼び出すリストをループします。これにより、インタプリタがすべてのタスクが完了するまで続行できなくなります。 – Julien