2016-08-22 14 views
1

ThreadPoolExecutorを使用して高価なジョブを作成するPythonコードがいくつかあります。どのシステムが完了したのかを把握したいので、このシステムを再起動する必要がある、私はすでに完了したものをやり直す必要はありません。シングルスレッドのコンテキストでは、私が行ったことを棚に記しておくことができます。マルチスレッド環境でのこのアイデアのナイーブなポートは次のとおりです。PythonでThreadPoolExecutorのコンテキストで軽い永続性

from concurrent.futures import ThreadPoolExecutor 
import subprocess 
import shelve 


def do_thing(done, x): 
    # Don't let the command run in the background; we want to be able to tell when it's done 
    _ = subprocess.check_output(["some_expensive_command", x]) 
    done[x] = True 


futs = [] 
with shelve.open("done") as done: 
    with ThreadPoolExecutor(max_workers=18) as executor: 
     for x in things_to_do: 
      if done.get(x, False): 
       continue 
      futs.append(executor.submit(do_thing, done, x)) 
      # Can't run `done[x] = True` here--have to wait until do_thing finishes 
     for future in futs: 
      future.result() 

    # Don't want to wait until here to mark stuff done, as the whole system might be killed at some point 
    # before we get through all of things_to_do 

私はこれをどうして取り除けますか? documentation for shelveにはスレッドの安全性に関する保証が含まれていないので、私は考えません。

これを処理する簡単な方法は何ですか?おそらくdone[x] = Truefuture.add_done_callbackに貼ると思いますが、that will often run in the same thread as the future itselfです。おそらく、ThreadPoolExecutorでうまく動作するロック機構がありますか?それは、眠っているループを書いて、完成した先物をチェックすることが私にとってはよりクリーンなようです。

答えて

1

は普通のpythonは、コンテキストマネージャを閉じたときに、それが唯一のディスクに書き込まれるオブジェクト - あなたは最も外側のwithコンテキストマネージャにまだいる間、done棚があり、それはその__exit__メソッドを実行します。したがって、(CPythonを使用している限り)GILのため、他のPythonオブジェクトと同じようにスレッドセーフです。

具体的には、再割り当てdone[x] = Trueはスレッドセーフであり、アトミックに行われます。

シェルフの__exit__メソッドは、Ctrl-Cの後に実行されますが、Pythonプロセスが突然終了し、シェルフがディスクに保存されない場合は表示されません。

この種の障害から保護するため、私はsqllite3のような軽量なファイルベースのスレッドセーフなデータベースを使用することをお勧めします。

+0

コンテキストマネージャの '__exit__'は、例えばKeyboardInterrupt例外の場合でも呼び出されるため、私の状態は永続化されるようです。スレッドの安全性については、GILのためにすべてのPythonオブジェクトがスレッドセーフであると言っていますか? – kuzzooroo

+0

すべてのPythonオブジェクトはスレッドセーフですが、(少なくともCPythonでは)I/Oを含まない基本的なアトミック操作(代入/再割り当て)を使用しています(writeは '__exit__'の後に行われます)。あなたは安全です。 – Julien

+0

私はあなたのコードが少し間違っていることを追加したいと思っています。あなたは 'do_thing'を呼び出すのではなく、最初の引数として渡すべきです。また、 'executor.submit'の戻り値を(通常は' futs'と呼ばれる)リストに格納する必要があります。次に、 'ThreadPoolExecutor'コンテキスト内で、各オブジェクトの' result() 'メソッドを呼び出すリストをループします。これにより、インタプリタがすべてのタスクが完了するまで続行できなくなります。 – Julien

関連する問題