2017-12-03 22 views
0

キューが空で、すべてのスレッドがすでにタスクの処理を完了している(つまり、task_done()が実行された)かどうかを確認するオプションはありますか?両方の条件が満たされている場合にのみ、追加のタスクを追加したい。Python - キューからのすべてのタスクが完了したかどうかを確認する

私はもっと多くのタスクを追加できるように、私は未使用のスレッドを終了してactiveCount()を使うことはできません。また、実行の進行状況を積極的に監視できるようにするために、キューに参加したくありません。ここで

はサンプルコードです:

from Queue import Queue 
from threading import Thread 
import time 

queue = Queue() 

def my_method(queue): 
    while True: 
     task = queue.get() 

     time.sleep((task + 2) * 3) 

     queue.task_done() 

num_queue_threads = 2 
queue_threads = [None] * num_queue_threads 

for i in range(num_queue_threads): 
    queue_threads[i] = Thread(target=my_method, args=(queue,)) 
    queue_threads[i].setDaemon(True) 
    queue_threads[i].start() 

for task in range(3): 
    queue.put(task) 

#queue.join() #need to wait actively 

while True: 
    print("queue.qsize(): {}, queue.empty(): {}".format(queue.qsize(), queue.empty())) 

    time.sleep(1) 

キューはすぐに最後のタスクの実行が開始されるように空です。

答えて

0

このためのパブリックインターフェイスはありません。誰かがおそらく未完成のタスクを追跡するためにキューが使用する内部属性を掘り下げる脆弱なソリューションを投稿しようとしますが、真剣にそうしないでください。その属性は、文書化されたAPIの一部ではなく、将来のバージョンで名前を変更したり、再設計することができます。

自分でタスクの完了を追跡してください。

from Queue import Queue 
from threading import Thread 
import time 

task_queue = Queue() 
completion_queue = Queue() 

def my_method(in_queue, out_queue): 
    while True: 
     task = in_queue.get() 
     time.sleep((task + 2) * 3) 
     in_queue.task_done() 

     # Send completion message 
     out_queue.put(task) 

num_queue_threads = 2 
queue_threads = [None] * num_queue_threads 

for i in range(num_queue_threads): 
    queue_threads[i] = Thread(target=my_method, args=(task_queue, completion_queue)) 
    queue_threads[i].setDaemon(True) 
    queue_threads[i].start() 

for task in range(3): 
    task_queue.put(task) 

for _ in range(3): 
    completion_queue.get() 
    completion_queue.task_done() 
    print("One task done!") 

print("All done!") 
+0

:それはそれは割り当てられたタスクの数に等しいメッセージの数を受けていますまで、1つのオプションは、労働者は、オーケストレータに「タスクの完全な」メッセージを送信することができます別のキュー、およびオーケストレーターの待機を持っているだろうここで問題となるのは、completion_queue.get()がどれくらい時間がかかるかを制御できないため、定期的なアクティビティを実行できないことがあるということです。しかし、少なくとも未完成のタスクを追跡するための他の推奨される方法はないことは知っています。私はおそらく代わりにsqliteからタスクステータスを使用するように私のロジックを切り替える(私はすでにこのデータベースを使用して自分のメタデータを格納する)。ありがとう! –

+0

また、私はcompletion_queue.get()をタイムアウトで実行し、空の例外を無視し、必要な定期的なアクティビティーを実行し、次のタスクの待機に戻ることができます。 –

関連する問題