2011-09-16 4 views
3

ロングアイズのポストを前にして謝っています。うまくいけば、それは解決のための十分な文脈を与えるでしょう。私は古いclassmethodの任意の数を取り、マルチスレッドのキューにそれらを固執するユーティリティ関数を作成しようとしました:Pythonスレッディング:何が欠けていますか? (task_done()があまりにも多く呼ばれました)

class QueuedCall(threading.Thread): 

    def __init__(self, name, queue, fn, args, cb): 
     threading.Thread.__init__(self) 
     self.name = name 

     self._cb = cb 
     self._fn = fn 
     self._queue = queue 
     self._args = args 

     self.daemon = True 
     self.start() 

    def run(self): 
     r = self._fn(*self._args) if self._args is not None \ 
      else self._fn() 

     if self._cb is not None: 
      self._cb(self.name, r) 

      self._queue.task_done() 

はここ

私の呼び出し元のコードが(クラス内)のようになります
data = {} 
def __op_complete(name, r): 
    data[name] = r 

q = Queue.Queue() 

socket.setdefaulttimeout(5) 

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete)) 
q.put(QueuedCall('so_answers', q, StackExchange.get_answers, 
    ['api.stackoverflow.com', 534476, 5], __op_complete)) 
q.put(QueuedCall('so_user', q, StackExchange.get_user_info, 
    ['api.stackoverflow.com', 534476], __op_complete)) 
q.put(QueuedCall('p_answers', q, StackExchange.get_answers, 
    ['api.programmers.stackexchange.com', 23901, 5], __op_complete)) 
q.put(QueuedCall('p_user', q, StackExchange.get_user_info, 
    ['api.programmers.stackexchange.com', 23901], __op_complete)) 
q.put(QueuedCall('fb_image', q, Facebook.get_latest_picture, None, __op_complete)) 

q.join() 
return data 

私はここに実行している問題は、新鮮なサーバの再起動にごとタイムで働くように見えるということですが、エラーで、すべての第二または第三の要求を失敗:

ValueError: task_done() called too many times

このエラーは2番目または3番目の要求ごとにランダムなスレッドになりますので、正確にの問題を解決することはむしろ困難です。

誰でも意見や提案がありますか?

ありがとうございました。


編集:私はこの(迅速かつ汚いのではなく、ロギング)をデバッグするための努力でprint秒を追加した

runの1行目の印刷ステートメント(print 'running thread: %s' % self.name)とtask_done()print 'thread done: %s' % self.name)を呼び出す前のもう1つの印刷ステートメント

成功した要求の出力:

running thread: twitter 
running thread: so_answers 
running thread: so_user 
running thread: p_answers 
thread done: twitter 
thread done: so_user 
running thread: p_user 
thread done: so_answers 
running thread: fb_image 
thread done: p_answers 
thread done: p_user 
thread done: fb_image 

失敗した要求の出力:

running thread: twitter 
running thread: so_answers 
thread done: twitter 
thread done: so_answers 
running thread: so_user 
thread done: so_user 
running thread: p_answers 
thread done: p_answers 
Exception in thread p_answers: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/home/demian/src/www/projects/demianbrecht/demianbrecht/demianbrecht/helpers.py", line 37, in run 
    self._queue.task_done() 
    File "/usr/lib/python2.7/Queue.py", line 64, in task_done 
    raise ValueError('task_done() called too many times') 
ValueError: task_done() called too many times 

running thread: p_user 
thread done: p_user 
running thread: fb_image 
thread done: fb_image 
+0

スレッドが 'task_done'呼び出しの前に診断を出力するようにしてデバッグしようとしましたか? –

+0

@カール:はい、編集をご覧ください。 –

+0

もう少し詳しく:スレッドを0.1秒間スリープ状態にしても、エラーを再現することはできません。 Grr。明らかに、私はまだ問題の根本を見つけたいと思っています。 –

答えて

6

この問題へのあなたのアプローチは、 "型破り" です。しかし、今のところそれを無視し...問題は、次のワークフローは、スレッドを構築し、QueuedCallによって開始された

  1. を発生することが明らかに可能であるだけで、あなたが

    q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete)) 
    

    を与えているコードであります.__ init__

  2. キューqに入れられます。ただし、キューが項目を挿入するためのロジックを完了する前に、独立したスレッドはすでに作業を終了し、q.task_done()を呼び出そうとしました。エラーが発生します(キューにオブジェクトが安全に置かれる前にtask_done()が呼び出されました)。

どうすればいいですか?キューにスレッドを挿入しません。キューは、スレッドが処理するデータを保持します。その代わりにあなたは

  • キューを作成します。そこにあなたがあなたが作成
  • (例えば機能、彼らが望む引数やコールバックなど)完了したいジョブを挿入し、ワーカースレッドがに関数を取得する
    • q.get()を呼び出すワーカースレッドに
    • を開始呼び出す
    • q.task_done()を呼び出して、そのアイテムが処理されたことをキューに知らせる。
+0

驚くべきことに、説明のためにありがとう。その解決策は、「Queue」クラスの使用に関する私の根本的な誤解の中で明らかに生まれました。おそらく私はRTFMをもっと徹底的に学ぶべきです;) –

+0

しかし、好奇心から、私は*コンストラクタ内で*スレッドを '開始 'せず、むしろ' start'を外部から呼び出す*後* 'put'競合状態を解消するだろうか? (あなたの説明を考えれば、どんなストレッチでも*良い*アイデアになると言っているわけではありません。単に好奇心が強いです) –

2

私はここで誤解されてもよいが、私はあなたが正しくQueueを使用しているかわかりません。

ドキュメントの簡単な調査から、それはアイデアは、あなたがQueueに仕事を入れてputメソッドを使用することができるということであるように、その後、別のスレッドが作業を行う、そこからいくつかの仕事を得るためにgetを呼び出すことができます見えます、完了したらtask_doneに電話してください。

あなたのコードは、putのインスタンスがQueuedCallのキューに入っています。キューからのgetのインスタンスはありませんが、QueuedCallのインスタンスも、挿入されているキューへの参照が渡されます(キュー内からのものではなく、本来知っています)。 task_doneに電話してください。

私のすべての読んだことが正しい場合(私は見ることができない別の場所からgetメソッドを呼び出さないでください)、私は問題を理解していると信じています。

QueuedCallインスタンスはキューに入れる前に作成する必要があり、インスタンスを作成すると別のスレッドで作業が開始されるという問題があります。スレッドが作業を終了し、の前にtask_doneを呼び出した場合、メインスレッドはputQueuedCallをキューに管理しています。エラーが表示されることがあります。

偶然に初めて実行した場合にのみ動作すると思います。 GILはあなたを助けます。 QueuedCallスレッドが実際にGILを獲得し、すぐに実行を開始する可能性はあまりありません。実際にあなたがカウンタ以外のキューを気にしないということは、これがうまくいくようにも役立ちます。QueuedCallがまだ空でない限りキューにヒットしていないかどうかは関係ありません。(QueuedCallキュー内の別の要素task_doneがあり、時間がの場合、要素はtask_doneとなります。これはうまくいけばキューに入っていて、これでマークされます)。また、sleepを追加すると、新しいスレッドが少し待たされるため、スレッドが実際にキューに入っていることを確認するための時間がメインスレッドに与えられます。そのため、問題が隠蔽されます。

また、インタラクティブシェルを使用した簡単な操作からわかるように、実際には決してgetなので、キューは実際にはいっぱいです。受信したメッセージの数がputtask_done個であるため、joinが機能します。

QueuedCallクラスの仕組みを根本的に再設計するか、Queue以外の同期プリミティブを使用する必要があると思います。 A Queueは、すでに存在するワーカースレッドの作業をキューイングするために使用するように設計されています。キューに入れたオブジェクトのコンストラクタ内からスレッドを開始することは、あまり適切ではありません。

+0

徹底的な説明を歓迎します。ありがとう:) –

関連する問題