2009-10-14 10 views
12

私はQueueクラスにjoin()できますが、呼び出しがまだ戻っていない場合はしばらくしてからタイムアウトしたいと考えています。 これを行うにはどうすればよいですか?キュー\をメタクラスを使ってサブクラス化することで可能ですか?PythonのQueue.join()にタイムアウト引数を追加する

+1

あなたは、すべてのワーカースレッドが(task_doneで終わることを保証するだろう) – tuergeist

答えて

17

サブクラスおそらく最も良い方法はです。このような何かが(未テスト)動作するはずです:

def join_with_timeout(self, timeout): 
    self.all_tasks_done.acquire() 
    try: 
     endtime = time() + timeout 
     while self.unfinished_tasks: 
      remaining = endtime - time() 
      if remaining <= 0.0: 
       raise NotFinished 
      self.all_tasks_done.wait(remaining) 
    finally: 
     self.all_tasks_done.release() 
+1

ありがとう! all_task_doneについての情報はどこで入手できましたか?私はhttp://docs.python.org/library/queue.html#module-Queueを見ましたが、私はそのメンバの言及を見ません... – olamundo

+3

Queueのソースコードを読むことができます。 'put'と' get'に対して 'timeout'パラメータが実装されていますが、同様のアプローチを使うために' join'を拡張するのは簡単です。 –

+0

うーん、スマートな解決;) – tuergeist

0

最初に、あなたがスレッドにキューのコードをラップすることができ、task_done()

とキュー出口ですべての作業スレッドがQueueでタイムアウト機能を実装するためにことを確認する必要がありますし、使用してこのスレッドのタイムアウトを追加します

テストされていない例Thread.join([timeout])は、私が提案する何を概説する

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

def queuefunc(): 
    q = Queue() 
    for i in range(num_worker_threads): 
     t = Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    for item in source(): 
     q.put(item) 

    q.join()  # block until all tasks are done 

t = Thread(target=queuefunc) 
t.start() 
t.join(100) # timeout applies here 
10

は(参加)方法がすべて完了するすべてのタスクを待っている程度です。あなたは、タスクが実際に完了したかどうか気にしない場合は、あなたが定期的に未完成のタスク数をポーリングすることができます

stop = time() + timeout 
while q.unfinished_tasks and time() < stop: 
    sleep(1) 

をタスクが実行されたとき、またはタイムアウト期間が経過したときに、このループは存在します。

レイモンド

関連する問題