2017-01-31 12 views
4

シナリオ - 私は仕事を提供しているプロセスプールを持っています。ただし、タスクの実行中にサブプロセスが終了した場合、AsyncResultオブジェクトは準備完了としてマークされません。私が期待していたことは、それが準備ができて失敗であるとマークされるということでした。 AsyncResultが準備ができていないかどうかはどうすればわかりますか?

はこれを再現するには、次の

>>> import multiprocessing 
>>> import time 
>>> p = multiprocessing.Pool(processes=1) 
>>> result = p.apply_async(time.sleep, args=(1000,)) 
>>> result.ready() 
False 

を別のシェルでは、プロセスIDを見つけて、それを殺します。

>>> result.ready() 
False 
>>> result.wait(5) # Waits 5 seconds even though subprocess is dead 

これは、スレッドが終了するまで待っているスレッドがあり、通常はかなり長いタイムアウトがあるために発生します。タイムアウトを待つことなく、result.wait(timeout)コールを終了するにはどうすればよいですか?また、それは放棄されていると私はどのように通知し、タスクがまだ実行されているだけでなく、私たちはタイムアウトに達した?

答えて

1

Pebbleライブラリは、プロセスが予期せず終了した場合に通知します。タイムアウトとコールバックもサポートしています。

あなたの例です。

from pebble import ProcessPool 
from concurrent.futures import TimeoutError 

with ProcessPool() as pool: 
    future = pool.schedule(time.sleep, args=(1000,), timeout=100) 

    try: 
     results = future.result() 
     print(results) 
    except TimeoutError as error: 
     print("Function took longer than %d seconds" % error.args[1]) 
    except ProcessExpired as error: 
     print("%s. Exit code: %d" % (error, error.exitcode)) 
    except Exception as error: 
     print("function raised %s" % error) 
     print(error.traceback) # Python's traceback of remote process 

documentationの例です。

+0

https://github.com/noxdafox/pebble - あなたはライブラリを書いたと思いますか? –

+0

はい、私はしました。タイムアウトやクラッシュを処理できる実装がないため、プールがライブラリに追加されました。限られた時間しか保証されないCPU集約型アプリケーションを実行する場合、これは特に重要です。その他のユースケースの例は、アプリケーション全体をクラッシュさせるランダムなsegfaultsを持つCライブラリのPythonバインディングです。 – noxdafox

1

を呼び出すと、プールからの信号を受信しない限り、timeoutに達するまで待機します。ただし、kill -9 [pid]を実行すると、プールはすぐにキュー内の次のジョブを開始します。

このように、それを利用して手動で "ポーリング"してready()をチェックする方が簡単です。あなたが述べたように、ジョブが終了すると、ready()Falseのままです。

これを修正するには、pidが有効かどうかを確認します。 ApplyResultにはpidが含まれていないので、取得するには他の手段が必要です。

def test(identifier): 
    pid = os.getpid() 

    f = open("pids/" + str(pid), "w") 
    f.write(str(identifier)) 
    f.close() 

    # do stuff 
    time.sleep(1000) 

次に、この(jobs = []を考慮)のようなジョブを作成する:あなたはこのような何かを行うことができます。

job = (identifier, pool.apply_async(test, (identifier,))) 
jobs.append(job) 

識別子が必要しかし、あなたは後でApplyResultがどのPIDに属するかを把握したい場合に便利ですされていません。各PID-という名前のファイルの内容を考慮し

def is_alive(pid): 
    return os.path.exists("/proc/" + str(pid)) 

for pid in os.listdir("pids"): 
    if is_alive(pid): 
     ... 
    else: 
     ... 

各ジョブ(PID)が生きている場合は、すべてのジョブを取得し、確認することができます。次にidentifierjobsに見つけたら、ApplyResultがどのpidに属しているかをリンクして、特定のジョブが死んでいるかどうかを確認したり、ready()をチェックしたりすることができます。


パイプを作成して子プロセスをフォークすることもできます。

r, w = os.pipe() 

def child(): 
    global r, w 

    data = ... 
    time.sleep(100) 

    os.close(r) 
    w = os.fdopen(w, "w") 
    w.write(data) 
    w.close() 

次に、親プロセスにデータを書き戻します。

def parent(child_pid): 
    global r, w 

    os.close(w) 

    r = os.fdopen(r) 
    data = r.read() 
    r.close() 

    status = os.waitpid(child_pid, 0) 
    if status == 0: 
     # Everything is fine 
    elif status == 9: 
     # kill -9 [pid] 

    # Process data 

その後、statusdataは子プロセスに何が起こったのかを判断するために、受信し利用することができます。

あなたはすべてを始めて起動します。あなたの質問から

if __name__ == "__main__": 
    child_pid = os.fork() 
    if child_pid: 
     parent(child_pid) 
    else: 
     child() 

私は、Unixを想定しています。もし私を修正する気にしないでください。また、非Python 2.7が答えに潜入した場合、私は謝罪します。

関連する問題