2017-03-14 7 views
3

Python concurrent.futuresとProcessPoolExecutorは、タスクのスケジュールと監視のためのきれいなインタフェースを提供します。先物にもprovide .cancel()メソッド:Python:concurrent.futures取り消し可能にするには?

はキャンセル():コールをキャンセルしよう。呼び出しが現在実行されているであり、取り消すことができない場合このメソッドはFalseを返します。そうでない場合は呼び出しがキャンセルされ、メソッドはTrueを返します。

は残念ながらsimmilar question(に関するasyncio)でタスクを実行している答えの主張は、ドキュメントのこのスニップを使用してuncancelableですが、ドキュメントは、彼らが実行していて、uncancelableされている場合にのみ、それを言ういけません。

プロセスにmultiprocessing.Eventsを送信するとも自明のことはできません(multiprocess.Processのようにパラメータを介して、そうすることはRuntimeErrorを返す)

私は何をしようとしていますか?私は検索スペースを分割し、すべてのパーティションに対してタスクを実行したいと思います。しかし、それは1つのソリューションを持つだけで十分です。そのプロセスはCPUを大量に消費します。 ProcessPoolを使用して利益を相殺しない、これを実現する実際の快適な方法はありますか?

例:私はあなたの質問が面白い見つけ

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait 

# function that profits from partitioned search space 
def m_run(partition): 
    for elem in partition: 
     if elem == 135135515: 
      return elem 
    return False 

futures = [] 
# used to create the partitions 
steps = 100000000 
with ProcessPoolExecutor(max_workers=4) as pool: 
    for i in range(4): 
     # run 4 tasks with a partition, but only *one* solution is needed 
     partition = range(i*steps,(i+1)*steps) 
     futures.append(pool.submit(m_run, partition)) 

    done, not_done = wait(futures, return_when=FIRST_COMPLETED) 
    for d in done: 
     print(d.result()) 

    print("---") 
    for d in not_done: 
     # will return false for Cancel and Result for all futures 
     print("Cancel: "+str(d.cancel())) 
     print("Result: "+str(d.result())) 
+0

あなたはPARAMとしてそれを渡すのではなく、グローバル変数に 'Event'を設定しようとすることができ、http://stackoverflow.com/questions/1675766/how-to-combine-pool-map-withを参照してください-array-shared-memory-in-python-multiprocessing – niemmi

+0

@niemmi tippに感謝します。おそらくこれを回避策として試してみましょう。別のモジュールへの呼び出しでうまく設計されていないからです。 – Ketzu

答えて

2

はそうここに私の発見です。

.cancel()の動作がpythonのマニュアルに記載されているとおりです。実行中の並行機能に関しては、残念ながら、そのように指示された後でもキャンセルすることはできませんでした。私の発見が正しいとすれば、Pythonはより効果的な.cancel()メソッドを必要とします。

私の所見を確認するには、以下のコードを実行してください。

from concurrent.futures import ProcessPoolExecutor, as_completed 
from time import time 

# function that profits from partitioned search space 
def m_run(partition): 
    for elem in partition: 
     if elem == 3351355150: 
      return elem 
      break #Added to terminate loop once found 
    return False 

start = time() 
futures = [] 
# used to create the partitions 
steps = 1000000000 
with ProcessPoolExecutor(max_workers=4) as pool: 
    for i in range(4): 
     # run 4 tasks with a partition, but only *one* solution is needed 
     partition = range(i*steps,(i+1)*steps) 
     futures.append(pool.submit(m_run, partition)) 

    ### New Code: Start ### 
    for f in as_completed(futures): 
     print(f.result()) 
     if f.result(): 
      print('break') 
      break 

    for f in futures: 
     print(f, 'running?',f.running()) 
     if f.running(): 
      f.cancel() 
      print('Cancelled? ',f.cancelled()) 

    print('New Instruction Ended at = ', time()-start) 
print('Total Compute Time = ', time()-start) 

更新: 強制的にbashを介して、並行プロセスを終了させることは可能ですが、結果は主要なPythonプログラムがあまりにも終了するということです。これが問題でない場合は、以下のコードを試してみてください。

これを表示するには、最後の2つの印刷文の間に次のコードを追加する必要があります。注:このコードは、他のpython3プログラムを実行していない場合にのみ機能します。

import subprocess, os, signal 
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='], 
         stdout=subprocess.PIPE).stdout.decode('utf-8').split() 
print ('result =', result) 
for i in result: 
    print('PID = ', i) 
    if i != result[0]: 
     os.kill(int(i), signal.SIGKILL) 
     try: 
      os.kill(int(i), 0) 
      raise Exception("""wasn't able to kill the process 
           HINT:use signal.SIGKILL or signal.SIGABORT""") 
     except OSError as ex: 
      continue 
1

concurrent.futures.Future.kill()メソッドを持っていない理由を私は知らないが、あなたはpool.shutdown(wait=False)とプロセスプールをシャットダウンし、手で、残りの子プロセスを殺すことであなたが望むものを達成することができます。

子プロセスを殺すために関数を作成します:あなたが最初の結果を得るまで

import signal, psutil 

def kill_child_processes(parent_pid, sig=signal.SIGTERM): 
    try: 
     parent = psutil.Process(parent_pid) 
    except psutil.NoSuchProcess: 
     return 
    children = parent.children(recursive=True) 
    for process in children: 
     process.send_signal(sig) 

その後、残りのすべての子プロセスを殺す、あなたのコードを実行します。

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait 

# function that profits from partitioned search space 
def m_run(partition): 
    for elem in partition: 
     if elem == 135135515: 
      return elem 
    return False 

futures = [] 
# used to create the partitions 
steps = 100000000 
pool = ProcessPoolExecutor(max_workers=4) 
for i in range(4): 
    # run 4 tasks with a partition, but only *one* solution is needed 
    partition = range(i*steps,(i+1)*steps) 
    futures.append(pool.submit(m_run, partition)) 

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED) 

# Shut down pool 
pool.shutdown(wait=False) 

# Kill remaining child processes 
kill_child_processes(os.getpid()) 
1

残念ながら、Futuresはキャンセルできません実行しています。コアとなるのは、同じAPIを異なる実装で使用することです(実行中のスレッドやコルーチンを中断することはできません)。

Pebbleライブラリは、この制限および他の制限を克服するように設計されています。

from pebble import ProcessPool 

def function(foo, bar=0): 
    return foo + bar 

with ProcessPool() as pool: 
    future = pool.schedule(function, args=[1]) 

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task 
    future.cancel() 
関連する問題