2013-03-12 27 views
11

私は単純なPythonスレッドプールパターンの優れた実装を見てきましたが、私のニーズに合ったものは実際には見つかりませんでした。私はPython 2.7を使用していますが、私が見つけたすべてのモジュールは動作しないか、またはワーカーの例外を適切に処理しません。私が探している機能の種類を提供できるライブラリを誰かが知っているかどうかは疑問だった。ヘルプは非常に感謝します。 例外を処理するPythonスレッドプール

私の最初の試みをマルチプロセッシング

内蔵multiprocessingモジュールとあったが、これは、スレッドが、サブプロセスを使用していないとして、代わりに私たちは、オブジェクトが漬けすることができない問題に遭遇します。いいえ、ここには行きません。

from multiprocessing import Pool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = Pool(processes=8) 
for s in samples: pool.apply_async(s.compute_fib, [20]) 
pool.join() 
for s in samples: print s.fib 

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

先物

だから私は、Python 3.2 hereのクールな同時機能のいくつかのバックポートがある参照してください。これは完璧で使いやすいと思われます。問題は、ワーカーの1人で例外が発生した場合、「ZeroDivisionError」などの例外のタイプのみを取得しますが、トレースバックがないため、例外が発生した行は示されません。コードをデバッグすることが不可能になります。立ち入り禁止。

from concurrent import futures 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = futures.ThreadPoolExecutor(max_workers=8) 
threads = [pool.submit(s.compute_fib, 20) for s in samples] 
futures.wait(threads, return_when=futures.FIRST_EXCEPTION) 
for t in threads: t.result() 
for s in samples: print s.fib 


# futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self) 
# 354  def __get_result(self): 
# 355   if self._exception: 
#--> 356    raise self._exception 
# 357   else: 
# 358    return self._result 
# 
# ZeroDivisionError: integer division or modulo by zero 

ワーカプール

私はこのパターンhereの他の実装を発見しました。今回は例外が発生したときに出力されますが、私のipythonインタラクティブインタプリタはハング状態のまま残り、他のシェルからkillする必要があります。立ち入り禁止。

import workerpool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = workerpool.WorkerPool(size=8) 
for s in samples: pool.map(s.compute_fib, [20]) 
pool.wait() 
for s in samples: print s.fib 

# ZeroDivisionError: integer division or modulo by zero 
# ^C^C^C^C^C^C^C^C^D^D 
# $ kill 1783 

スレッドプール

さらに他の実装here。今回例外が発生すると、stderrに出力されますが、スクリプトは中断されず、代わりに実行の継続を行います。これは例外の目的に反するものであり、物事を安全にすることができます。まだ使用できません。

from concurrent.futures import ThreadPoolExecutor, as_completed 

def div_zero(x): 
    return x/0 

with ThreadPoolExecutor(max_workers=4) as executor: 
    futures = executor.map(div_zero, range(4)) 
    for future in as_completed(futures): print(future) 

のPython:

import threadpool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = threadpool.ThreadPool(8) 
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples] 
requests = [y for x in requests for y in x] 
for r in requests: pool.putRequest(r) 
pool.wait() 
for s in samples: print s.fib 

# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
#---> 17 for s in samples: print s.fib 
# 
#AttributeError: 'Sample' object has no attribute 'fib' 

- - アップデート

futuresライブラリについて、パイソン3の動作は、Python 2.

futures_exceptions.pyと同じではないことが表示されます2.7.6出力:

Traceback (most recent call last): 
    File "...futures_exceptions.py", line 12, in <module> 
    for future in as_completed(futures): 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed 
    with _AcquireFutures(fs): 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__ 
    self.futures = sorted(futures, key=id) 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map 
    yield future.result() 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result 
    return self.__get_result() 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result 
    raise self._exception 
ZeroDivisionError: integer division or modulo by zero 

のPython 3.3.2出力:

Traceback (most recent call last): 
    File "...futures_exceptions.py", line 11, in <module> 
    for future in as_completed(futures): 
    File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed 
    with _AcquireFutures(fs): 
    File "...python3.3/concurrent/futures/_base.py", line 142, in __init__ 
    self.futures = sorted(futures, key=id) 
    File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator 
    yield future.result() 
    File "...python3.3/concurrent/futures/_base.py", line 392, in result 
    return self.__get_result() 
    File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result 
    raise self._exception 
    File "...python3.3/concurrent/futures/thread.py", line 54, in run 
    result = self.fn(*self.args, **self.kwargs) 
    File "...futures_exceptions.py", line 7, in div_zero 
    return x/0 
ZeroDivisionError: division by zero 
+0

:この他の質問に対する私の答えをチェックアウト組み込みの 'map'。 –

答えて

3

私は個人的にはFuturesを使用しています。インターフェイスはとてもシンプルです。トレースバックの問題については、私はそれを保存するための回避策を見つけました。これは、問題を完全に解決していませんが、私は多くの場合、これらの問題のデバッグに使用している1つのトリックが一時的に呼び出してpool.map` `への呼び出しを置き換えるされ

Getting original line number for exception in concurrent.futures

0

簡単な解決策:使用あなたの最善の選択肢どんなスーツ、そしてあなたの労働者に、独自のtry-exceptブロックを実装します。もし必要なら、ルートコールを囲む。

これらのライブラリは「間違って」例外を処理するとは言いません。彼らはプリミティブではあるが、デフォルトの動作をしています。デフォルトがあなたに合わない場合は、これを自分で処理することが期待されます。

+0

'try-execpt'ブロックを追加しても問題は解決しません。 'concurrent'の場合、私はまだ新しい例外を捕まえた後、元のトレースバックに到達できません。 'workerpool'の場合は、インタプリタがクラッシュする前に例外ブロックに行きません。 'threadpool'の場合、例外はまったく発生しないので、exceptブロックには決して行きません。 – xApple

+1

メインスレッドまたはプロセスで 'try'ブロックを考えています。私はあなたが関数ワーカープロセスが実行されている周りの '試行 'ブロックを使用すると言っています。ワーカースレッド/プロセスで例外を '発生させてメインのスクリプトに送るようにする場合は、最初に発生した箇所をキャッチする必要があります。 – slezica

+0

私は、実行したいすべての機能に対してエラー処理を書くつもりはありません。だから、あなた自身が言っているのは、私自身のグローバルなエラー処理を書くべきだということです。はい、私はライブラリの1つを選択してソースコードを編集して機能を追加することができましたが、それは私が避けたかったものです: – xApple