0

私はUbuntu 16.04.2 LTSとPython 3.5.2を使用しています。 私はThreadPoolからmultiprocessingまで仕事をしています。 今私は仕事をしている間にこのプールを終了したいと思います。Python 3:マルチプロセッシングでのスレッドの終了ThreadPoolが機能しない

ThreadPool.terminate()を使用すると、期待どおりに機能しません。 次の例を実行すると、ワーカーは作業を停止することはなく、プログラムはThreadPool.join()コールを超えて実行されることはありません。

import time 
from multiprocessing.pool import ThreadPool 

def task(): 
    try: 
     while True: 
      print("Working") 
      time.sleep(1) 
    except: # Unsuccessful attempt. :(
     print("Working, stopping now") 

thread_pool = ThreadPool(processes=1) 

thread_pool.apply_async(task) 
time.sleep(1) # Make sure the task is actually started. 

print("Terminating") 
thread_pool.terminate() 
print("Termination: Initiated") 
thread_pool.join() # Does not return. 
print("Termination: Done") 

私は間違っていますか?

答えて

1

具体的にはではなくThreadPoolをリクエストしました。つまり、新しいプロセスを作成する代わりに、multiprocessingコードがローカルスレッドを作成します(独自のプロセスで)。

ThreadPoolインスタンスは、それに対して定義されたメカニズムがないため(システムがos.killを呼び出すことができるプロセス)、突然終了することはできません。あなたのコードでは、.terminate指令は無視されています。実際には、プールインスタンスがタスクが戻るときにチェックするフラグを設定しますが、タスクは返されません。 --delay 5を実行したとき

$ python3 tp.py 
Working on 0 with i = 0 
Working on 0 with i = 1 
Terminating 
Termination: Initiated 
Working on 0 with i = 2 
Termination: Done 

と、この::また

$ python3 tp.py --delay 5 
Working on 0 with i = 0 
Working on 0 with i = 1 
Working on 0 with i = 2 
Working on 1 with i = 0 
Working on 1 with i = 1 
Terminating 
Working on 1 with i = 2 
Termination: Initiated 
Termination: Done 

、あなたの場合

import argparse 
import sys 
import time 

from multiprocessing.pool import Pool, ThreadPool 

def task(arg): 
    for i in range(3): 
     print("Working on", arg, "with i =", i) 
     time.sleep(1) 

def main(): 
    parser = argparse.ArgumentParser() 
    parser.add_argument('--delay', default=1, type=float) 
    args = parser.parse_args() 

    thread_pool = ThreadPool(processes=1) 

    thread_pool.apply_async(task, (0,)) 
    thread_pool.apply_async(task, (1,)) 
    time.sleep(args.delay) 

    print("Terminating") 
    thread_pool.terminate() 
    print("Termination: Initiated") 
    thread_pool.join() # Does not return. 
    print("Termination: Done") 

if __name__ == '__main__': 
    try: 
     sys.exit(main()) 
    except KeyboardInterrupt: 
     sys.exit('\nInterrupted') 

引数なしで実行すると、この処理を行います。私たちは、改訂版でこれを見ることができますThreadPoolの代わりにPoolを使用すると、実際のプロセスが得られ、いつでも.terminateを使用することができます(普通のcコミュニケーション・キューをねじ込むことについて)

+0

ありがとうございました。私はスレッドの代わりにプロセスを使用する実際のマルチプロセッシングの実装について知っていましたが、私のアプリケーションにスレッドを使用する必要があります。しかし、私は 'ThreadPool.terminate()'の実装が実際にはそうであることは知らなかった。私自身が終了通信機能を実装しなくても使える 'ThreadPool'の代替手段はありますか?タスクで例外をキャッチするようなことをしなければならないのはもちろんです。 – mxscho

+0

Pythonが提供するレベルよりも低いレベルになると、特定のスレッドにシグナルを送ることができます。メカニズムは(明らかに)マシンに依存します。いくつかのシステム(確かにFreeBSD)のスレッドシグナルハンドリングには、あなたのPythonで回避策があるかもしれないさまざまなバグがあり、それがさらに干渉する可能性があるので、あなたのシステム上で動作することを実験する必要があります。これを行うための移植可能な方法は、共有する「停止する時間」を設定することです。タスク内で定期的にチェックしてください。 – torek

関連する問題