2011-06-28 10 views
9

私は、concurrent.futuresモジュールを使用してpython3.2でタイムアウトを取得しようとしています。しかし、タイムアウトすると、実際に実行が停止することはありません。私は両方のスレッドとプロセスプールエグゼキュータを試してみましたが、どちらもタスクを停止せず、終了するまでタイムアウトが発生しません。だから誰でも知っている可能性がある場合は、この作業を取得するには?タイムアウトでconcurrent.futuresを使用するには?

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

def run_loop(max_number): 
    print("Started:", datetime.datetime.now(), max_number) 
    last_number = 0; 
    for i in range(1, max_number + 1): 
     last_number = i * i 
    return last_number 

def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 

if __name__ == '__main__': 
    main() 

答えて

15

私が言うことができる限り、実際にTimeoutErrorが発生するのは、予想されるときです。タスクが終了した後ではありません。

ただし、プログラム自体は、実行中のすべてのタスクが完了するまで実行され続けます。これは、現在実行中のタスク(あなたの場合はおそらく、プールサイズがタスクの数と同じように、おそらくすべての提出タスク)が実際には "殺されていない"ためです。

TimeoutErrorが発生するため、タスクが終了するまで待たずに(何か他のことをする)、タスクが完了するまで実行を継続します。あなたのExecutorのスレッド/サブプロセスに未完成のタスクがある限り、Pythonは終了しません。

私が知る限り、現在実行中のFuturesを "停止"することはできません。まだ開始されていないスケジュールされたタスクのみをキャンセルすることができます。あなたの場合、何もありませんが、5つのスレッド/プロセスのプールがあり、100個のアイテムを処理したいと考えています。ある時点では、完了したタスクが20、実行中のタスクが5、スケジュールされたタスクが75になる場合もあります。この場合、76のスケジュールされたタスクをキャンセルすることができますが、実行中の4つのタスクは、結果を待つかどうかに関係なく、完了するまで続きます。

このようにすることはできませんが、最終的な結果を得る方法があるはずです。各「タスク」のための呼び出し可能オブジェクトを作成し、それらを与えることによって

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

class Task: 
    def __init__(self, max_number): 
     self.max_number = max_number 
     self.interrupt_requested = False 

    def __call__(self): 
     print("Started:", datetime.datetime.now(), self.max_number) 
     last_number = 0; 
     for i in xrange(1, self.max_number + 1): 
      if self.interrupt_requested: 
       print("Interrupted at", i) 
       break 
      last_number = i * i 
     print("Reached the end") 
     return last_number 

    def interrupt(self): 
     self.interrupt_requested = True 

def main(): 
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor: 
     tasks = [Task(num) for num in max_numbers] 
     for task, future in [(i, executor.submit(i)) for i in tasks]: 
      try: 
       print(future.result(timeout=1)) 
      except concurrent.futures.TimeoutError: 
       print("this took too long...") 
       task.interrupt() 


if __name__ == '__main__': 
    main() 

:たぶん、このバージョンは、道(それはあなたが望んでいたまさになければわからないが、それはいくつかの使用であるかもしれない)であなたを助けることができます単純な関数ではなくエグゼキュータに渡すことで、タスクを「中断する」方法を提供できます。 ヒント:task.interrupt()行を削除し、何が起こるか見て、それはそれが簡単に;-)

4

上記の私の長い説明を理解するために作ることが最近、私はまた、この問題をヒット、そして最後に、私はProcessPoolExecutorを使用して、次の解決策を考え出す:


def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 
      stop_process_pool(executor) 

def stop_process_pool(executor): 
    for pid, processes in executor._processes.items(): 
     process.terminate() 
    executor.shutdown() 
+0

txmcあなたはプロセスの1つを殺すことができますか?それとも、すべてを殺さなければならないのですか? –

関連する問題