私は、I/O重いブロッキング呼び出しに依存するいくつかのプロデューサ機能と、I/O重いブロッキング呼び出しにも依存する一部のコンシューマ機能を持っています。それらをスピードアップするために、私はGeventマイクロスレッドライブラリをグルーとして使用しました。私は、4人の消費者を持っているし、2台の生産を持っていると思いGeventでマルチプロデューサ、マルチ消費者パラダイムを実装するにはどうすればよいですか?
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
producers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()
def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
#This doesnt work.
for j in range(2):
producers.append(gevent.spawn(producer))
#Uncommenting this makes this script work.
#producer()
q.join()
:ここ
は私のパラダイムは次のようになります。プロデューサは、信号が10であるときに終了する。消費者はこのキューからの供給を継続し、プロデューサおよびコンシューマが終了するとタスク全体が終了する。
ただし、これは機能しません。 for
というループが複数のプロデューサを生成し、1つのプロデューサしか使用しないとコメントアウトすると、スクリプトは正常に動作します。
私は間違ったことを理解できないようです。
アイデア?
おかげで
こんにちはZCH、私は完全にあなたの答えを踏襲していません。小さなスニペットを貼っていただけますか?それは少し物事を明確にします。 –
@MridangAgarwalla - 'q.join()'の前に 'プロデューサのプロデューサ:producer.join()'を書き込みます。この方法では、最初にすべてのプロデューサが作業を終了し、キューが空になるまで待機します。 – zch
ああ、多分私はそれを間違って実装しました。私は自分のプロデューサーとコンシューマーが同時に稼働することを望んでいました。つまり、プロデューサーは、すべてのキューアイテムが終了してプロデューサーがキューに物事を追加しなくなるまでコンシューマーがフィードを終えるまで、キューに追加を続けます。 –