2017-12-31 263 views
1

thats私の最初の質問stackoverflow。私は主にここで私が知る必要があるものを見つけることができました。このbtwのために多くのありがとう。Python Processpoolexecutor - キューを強制終了しますか?

ただし、私がProcessPoolExecutorをkillしようとすると、生成されたキュー全体を処理するだけです(私はそう思いますか?)。 Processpoolexecutorのキューをすぐにクリーンアップする簡単な方法はありますか?

from concurrent.futures import ProcessPoolExecutor 
from time import sleep 
from random import randint 


def something_fancy(): 
    sleep(randint(0, 5)) 
    return 'im back!' 


class Work: 
    def __init__(self): 
     self.exe = ProcessPoolExecutor(4) 

    def start_procs(self): 
     for i in range(300): 
      t = self.exe.submit(something_fancy) 
      t.add_done_callback(self.done) 

    def done(self, f): 
     print f.result() 

    def kill(self): 
     self.exe.shutdown() 


if __name__ == '__main__': 
    work_obj = Work() 
    work_obj.start_procs() 
    sleep(5) 
    work_obj.kill() 

だから私がしたいのは、4つのプロセスによって処理される300のキューを生成することです。 5秒後、それはただ終了するはずです。

私はgil btwのためにプロセスを使用する必要があります。

答えて

0

shutdown(wait=False)を使用すると、より高速に戻ります。 waitのデフォルト値はTrueです。そうでない場合は、.Cancel()も提供され、cancleableでない場合はFalseを返します。保留中のすべての先物が実行完了するまでTrueある

wait場合、このメソッドは戻りませんし、エグゼキュータに関連したリソースがされています

link to the doku

それはまだかかわらず、すべてのprocessiontを終了します解放された。

waitFalseの場合、このメソッドは即座に戻り、保留中のすべての未処理の実行が完了すると、エグゼキュータに関連付けられたリソースは解放されます。 waitの値にかかわらず、すべての保留中の未来が実行されるまで、Pythonプログラム全体は終了しません。あなたは時間の一定量を持っている場合は

は、あなたがタイムアウトを提供する必要があります

map(func, *iterables, timeout=None, chunksize=1) 

フロートまたはINTすることができます - しかし、それはmsか何かであればドキュメンタリーは教えてくれありません。 ..私はリストに先物を追加し、手動でキューサイズを調整することによって、すべてのプロセスをキャンセルすることができたヒントで

0

おかげパトリック

。それがなければ、まだ開始されている多くのプロセスがあります。

キューサイズを調整したり、実行を一時停止したり、プロセスキューを削除したりするためのAPIがないようです。

しかし、これを実現する唯一の方法は、メインオブジェクトをスレッドで実行して、メインスクリプトがいつでもそれを強制終了できるようにすることです。そして、私はまだ "CancelledError"をキャッチしようとしています。

かなり汚いと思われ、私のためにpythonicではありません。私は他の提案をします。どうもありがとう。

from concurrent.futures import ProcessPoolExecutor, CancelledError 
from time import sleep 
from random import randint 
from threading import Thread 


def something_fancy(): 
    sleep(randint(0, 5)) 
    return 'im back!' 


class Work: 
    def __init__(self): 
     self.exe = ProcessPoolExecutor(4) 
     self.futures = [] 
     self.max_queue = 50 
     self.killed = False 

    def start_procs(self): 
     for i in range(200000): 
      while not self.killed: 
       if len(self.futures) <= self.max_queue: 
        t = self.exe.submit(something_fancy) 
        t.add_done_callback(self.done) 
        self.futures.append(t) 
        break 

    def done(self, f): 
     print f.result() 
     self.futures.remove(f) 

    def kill(self): 
     self.killed = True 
     for future in self.futures: 
      try: 
       future.cancel() 
      except CancelledError, e: 
       print e 


if __name__ == '__main__': 
    work_obj = Work() 
    Thread(target=work_obj.start_procs).start() 
    sleep(5) 
    work_obj.kill() 

edit

from concurrent.futures import ProcessPoolExecutor, CancelledError 
from time import sleep 
from random import randint 
from threading import Thread 


def something_fancy(): 
    sleep(0.5) 
    return 'Hello World, Future was running!' 


class Work: 
    def __init__(self): 
     cpu_usage = 4 
     self.exe = ProcessPoolExecutor(cpu_usage) 
     self.futures = [] 
     self.max_queue = cpu_usage*3 
     self.stop = False 
     self.paused = False 

    def start_procs(self): 
     for i in range(200000): 
      while not self.stop: 
       if len(self.futures) <= self.max_queue: 
        if not self.paused: 
         t = self.exe.submit(something_fancy) 
         t.add_done_callback(self._done) 
         self.futures.append(t) 
         break 

    def _done(self, f): 
     print f.result() 
     self.futures.remove(f) 

    def pause(self): 
     self.paused = False if self.paused else True 

    def shutdown(self): 
     self.stop = True 
     for future in self.futures: 
      try: 
       future.cancel() 
      except CancelledError, e: 
       print e 


if __name__ == '__main__': 
    work_obj = Work() 
    Thread(target=work_obj.start_procs).start() 
    print 'Started' 
    sleep(5) 
    work_obj.pause() 
    print 'Paused' 
    sleep(5) 
    work_obj.pause() 
    print 'Continue' 
    sleep(5) 
    work_obj.shutdown() 
    print 'Shutdown' 

動作すること - まだCancelledErrorとまだかなり汚れをキャッチしません。

関連する問題