2013-03-20 7 views
7

私はPython 2.7.3を使用しています。サブクラス化されたmultiprocessing.Processオブジェクトを使用して、いくつかのコードを並列化しました。サブクラス化されたProcessオブジェクトのコードにエラーがなければ、すべて正常に動作します。しかし、サブクラス化されたProcessオブジェクトのコードにエラーがあると、それらはサイレントにクラッシュします(親シェルにスタックトレースは出力されません)。CPU使用率はゼロになります。親コードは決してクラッシュしないので、実行がハングしているという印象を与えます。一方、エラーがどこにあるかについての指示がないため、コード内のエラーがどこにあるかを追跡することは本当に困難です。Pythonマルチプロセッシングプロセスがサイレントにクラッシュする

同じ問題を扱うstackoverflowに関するその他の質問はありません。

サブクラス化されたProcessオブジェクトは、親のシェルにエラーメッセージを出力することができないため、静かにクラッシュするようですが、少なくとも私はそれを使ってより効率的にデバッグすることができます私のコードの他のユーザーが問題に遭遇したときに教えてくれるように)。

EDIT:私の実際のコードがあまりにも複雑ですが、その中にエラーが発生してサブクラス化Processオブジェクトの簡単な例は、このようなものになるだろう:これが答えではない

from multiprocessing import Process, Queue 

class Worker(Process): 

    def __init__(self, inputQueue, outputQueue): 

     super(Worker, self).__init__() 

     self.inputQueue = inputQueue 
     self.outputQueue = outputQueue 

    def run(self): 

     for i in iter(self.inputQueue.get, 'STOP'): 

      # (code that does stuff) 

      1/0 # Dumb error 

      # (more code that does stuff) 

      self.outputQueue.put(result) 
+2

プロセスの例外を示すため、このような回避策を示唆? – Blender

+0

@ブレンダーいくつかのコードを追加しました。 – hendra

答えて

12

親プロセスに例外を渡す方法がありますか?それから、あなたが望むだけでそれらを扱うことができます。

concurrent.futures.ProcessPoolExecutorを使用すると、これは自動的です。 multiprocessing.Poolを使用すると、それは簡単です。 ProcessQueueを明示的に使用する場合は、作業を少しずつ行う必要がありますが、それはであり、それほど多くはありません。です。例えば

def run(self): 
    try: 
     for i in iter(self.inputQueue.get, 'STOP'): 
      # (code that does stuff) 
      1/0 # Dumb error 
      # (more code that does stuff) 
      self.outputQueue.put(result) 
    except Exception as e: 
     self.outputQueue.put(e) 

次に、あなたの呼び出し元のコードは、ちょうど何か他のもののようなキューからException Sを読み取ることができます。この代わりに:(私はあなたの最小限のサンプルだけでキューを無視しているため、実際の親プロセスキュー読み取りコードが何をするか知っている。しかし、できればしないでください

result = outq.pop() 
if isinstance(result, Exception): 
    raise result 
yield result 

:。

yield outq.pop() 

これを行います

これは、runになる未処理の例外を中止することを前提としています。これは、実際のコードが実際にはこのように機能しない場合でも考えられます。例外を返信して次のi in iterに進む場合は、tryforの代わりに移動してください。

これはまた、Exceptionが有効な値ではないことを前提としています。それが問題だ場合、最も簡単な解決策はただ(result, exception)タプルプッシュすることです。そして、

def run(self): 
    try: 
     for i in iter(self.inputQueue.get, 'STOP'): 
      # (code that does stuff) 
      1/0 # Dumb error 
      # (more code that does stuff) 
      self.outputQueue.put((result, None)) 
    except Exception as e: 
     self.outputQueue.put((None, e)) 

を、あなたのポッピングのコードがこれを行う:

result, exception = outq.pop() 
if exception: 
    raise exception 
yield result 

あなたは、これはNode.jsのコールバックに似ていることに気づくことがありスタイルでは、すべてのコールバックに(err, result)を渡します。はい、それは迷惑で、あなたはそのスタイルでコードを台無しにするつもりです。しかし、実際にはラッパー以外の場所では使用していません。キューから値を取得するか、またはrunの内部で呼び出されるすべての "アプリケーションレベル"のコードは、通常の返品/利回りと発生した例外を表示します。

Futureconcurrent.futuresという仕様(またはそのクラスをそのまま使用)にすることも、ジョブのキューイングを実行していても手動で実行しているとします。これは難しいことではありません。特に、デバッグ用の非常に優れたAPIを提供します。

最後に、キューごとに1つのワーカーだけが確実に必要な場合でも、エグゼキュータ/プールの設計では、ワーカーやキューを使用して構築されたほとんどのコードを非常に簡単にすることができます。すべてのボイラープレートを取り除き、Worker.runメソッドのループを関数に置き換えます(キューに追加するのではなく、通常はreturnsまたはraisesです)。呼び出し側では、すべてのボイラープレートを再度スクラップし、そのパラメータを使用してジョブ機能をsubmitまたはmapとするだけです。

あなたの全体の例では、に削減することができます。

def job(i): 
    # (code that does stuff) 
    1/0 # Dumb error 
    # (more code that does stuff) 
    return result 

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: 
    results = executor.map(job, range(10)) 

そして、それは自動的に適切に例外を処理します。


コメントに記載されているとおり、例外のトレースバックは子プロセスをトレースしません。手動のraise resultコール(または、プールまたはエグゼキュータを使用している場合は、プールまたはエグゼキュータの不具合)までしか流れません。

multiprocessing.Queuepickleの上に構築され、酸洗例外はそのトレースバックを漬けません。そしてその理由は、あなたがトレースバックをピケることができないということです。その理由は、トレースバックにはローカル実行コンテキストへの参照がたくさんあるため、別のプロセスで動作させることは非常に難しいことです。

だから、これについて何ができますか?完全に一般的な解決法を探しに行ってはいけません。代わりに、実際に必要なものについて考えてみてください。 90%の時間、 "トレースバックで例外をログに記録し、続行する"または "トレースバックで例外をデフォルトの未処理例外ハンドラのようにstderrexit(1)に出力する"いずれの場合でも、例外をまったく渡す必要はありません。子側でフォーマットし、文字列を渡します。 の場合、には何かもっと欲しいものが必要です。必要なものを正確に整理し、手動でまとめておくだけの十分な情報を渡します。トレースバックと例外をフォーマットする方法がわからない場合は、tracebackモジュールを参照してください。それはかなり簡単です。そして、これはあなたがピクルの機械にまったく入る必要がないことを意味します。 (copyregピックラーや、__reduce__メソッドなどのホルダークラスを書くことは非常に難しいことではありませんが、そうしなければならない理由は何ですか?)

+1

ありがとう!これは素晴らしい。しかし、スタックトレース全体を印刷する方法はありますか?それは、エラーが現在発生していることと、それが何であるかを示しますが、Workerクラスのどこにエラーが発生しているかはわかりません。 – hendra

+0

@npo:それを説明するために答えに追加します。 – abarnert

+0

これは、単に結果をコールバックに返す関数を使用する 'apply_async'にどのように適用できますか。 try/exceptで非同期関数の内部をラップして、例外オブジェクトをコールバックに返すだけですか? – CMCDragonkai

1

、ちょうど拡張コメント。このプログラムを実行AN何を得るの出力(もしあれば)を教えてください:

from multiprocessing import Process, Queue 

class Worker(Process): 

    def __init__(self, inputQueue, outputQueue): 

     super(Worker, self).__init__() 

     self.inputQueue = inputQueue 
     self.outputQueue = outputQueue 

    def run(self): 

     for i in iter(self.inputQueue.get, 'STOP'): 

      # (code that does stuff) 

      1/0 # Dumb error 

      # (more code that does stuff) 

      self.outputQueue.put(result) 

if __name__ == '__main__': 
    inq, outq = Queue(), Queue() 
    inq.put(1) 
    inq.put('STOP') 
    w = Worker(inq, outq) 
    w.start() 

は私が取得:

% test.py 
Process Worker-1: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/home/unutbu/pybin/test.py", line 21, in run 
    1/0 # Dumb error 
ZeroDivisionError: integer division or modulo by zero 

私はあなたが何を取得(場合)驚いています。

+0

シェルのPOSIXで何も得られなかったら私は驚いています。しかし、WindowsやIDLEやPyDevの場合や、親プロセスがGUIアプリケーションの場合は、どちらの方法でも賭けることはできません。 – abarnert

+0

@unutbu私は何も得ていません。 64ビットWindowsとIDLEの使用。 – hendra

+0

@npo:そうです、それをコンソールから実行するとどうなりますか? – unutbu

2

私はあなたがこの問題を示し、最小限のテストケースを投稿することができます

from multiprocessing import Queue, Process, RawValue, Semaphore, Lock, Pool 
import traceback 
run_old = Process.run 

def run_new(*args, **kwargs): 
    try: 
     run_old(*args, **kwargs) 
    except (KeyboardInterrupt, SystemExit): 
     raise 
    except: 
     traceback.print_exc(file=sys.stdout) 

Process.run = run_new 
+0

シンプル、ベストアンサー – CloudyGloudy

関連する問題