例。
import collections
import threading
import time
queue = collections.deque()
condition = threading.Condition()
def consumer():
condition.acquire()
while True:
while queue:
item = queue.popleft()
condition.release()
# do something with item
print(item)
condition.acquire()
condition.wait()
def push_item(item):
with condition:
queue.append(item)
condition.notify()
# From that point forward, it is just demonstration code to show how to use
def example_producer_thread(*args):
for arg in args:
push_item(arg)
consumer_thread = threading.Thread(target=consumer, name='queue consumer')
consumer_thread.daemon = True # so it does not prevent python from exiting
consumer_thread.start()
for example in [range(0, 10), range(10, 20), range(20, 30)]:
threading.Thread(target=example_producer_thread, args=example).start()
time.sleep(1) # let the consumer thread some time before the script gets killed
コアはここにある:
consumer()
は、消費者のスレッドでいくつかの他のスレッドがキューにアイテムを置くまで、それは(ポーリングなし)アイドル状態のままです。起動すると、キューに項目がなくなるまで、キューをロックし、アイテムを取得し、キューのロックを解除し、アイテムを処理します。その後、それを解放し、眠りに戻る。
push_item()
はキュー内の単一項目をプッシュし、起床すべきコンシューマスレッドに通知します。
残りはそれを実際の例にすることです。 example_producer_thread
は単にその引数をキューにプッシュします。そして、我々はそれらのうちの3つを開始し、それぞれがある範囲の数値で動作するので、結果を見ることができます。
キューにmaxlen
を追加するだけでよいです。おそらく、小規模なクラスに機能をカプセル化している間に、
「maxlen」パラメータを渡す['collections.deque'](https://docs.python.org/2/library/collections.html#collections.deque)を使用できます。 –
ああ、私はそれが必要です同期させる。そして、アイテムを利用できるようになるまで(消費者が連続してポーリングするのではなく)、消費者スレッドをブロックしたいので、単にmutexで保護するだけでは簡単ではないようです。 –
私もそれを考え出しました。私は再オープンしました –