私は1人のプロデューサーと2 遅い消費者とのプログラムを持っていると私は、各消費者が最後の値だけを処理するような方法でコルーチンでそれを書き換えたいのですがasyncio.Event()でブロッキングした消費者 (私はスレッドとthreading.Queue()
を使用しましたが、それをブロックしてput()
にブロックすると、ほとんどの場合キューが満杯になってしまいます)。Pythonのasyncio -
answer to this questionを読んだ後、私はasyncio.Event
とasyncio.Queue
を使用することに決めました。私は、このプロトタイプのプログラムを書いた:
import asyncio
async def l(event, q):
h = 1
while True:
# ready
event.set()
# get value to process
a = await q.get()
# process it
print(a * h)
h *= 2
async def m(event, q):
i = 1
while True:
# pass element to consumer, when it's ready
if event.is_set():
await q.put(i)
event.clear()
# produce value
i += 1
el = asyncio.get_event_loop()
ev = asyncio.Event()
qu = asyncio.Queue(2)
tasks = [
asyncio.ensure_future(l(ev, qu)),
asyncio.ensure_future(m(ev, qu))
]
el.run_until_complete(asyncio.gather(*tasks))
el.close()
を、私はq.get()
ライン上でそのl
コルーチンブロックに気づいたし、何も印刷されません。
私は(私は1,11,21、...を得る)の両方でasyncio.sleep()
を追加した後に期待どおりに動作します:
import asyncio
import time
async def l(event, q):
h = 1
a = 1
event.set()
while True:
# await asyncio.sleep(1)
a = await q.get()
# process it
await asyncio.sleep(1)
print(a * h)
event.set()
async def m(event, q):
i = 1
while True:
# pass element to consumer, when it's ready
if event.is_set():
await q.put(i)
event.clear()
await asyncio.sleep(0.1)
# produce value
i += 1
el = asyncio.get_event_loop()
ev = asyncio.Event()
qu = asyncio.Queue(2)
tasks = [
asyncio.ensure_future(l(ev, qu)),
asyncio.ensure_future(m(ev, qu))
]
el.run_until_complete(asyncio.gather(*tasks))
el.close()
...しかし、私はそれなしで解決策を探しています。
なぜそうですか?どうすれば修正できますか?私はawait l()
をm
から呼び出すことはできないと思います。それらの両方に状態があります(元のプログラムでは最初のPyGameと2番目のプロットの結果が表示されます)。
単に 'Event'を使うのではなく、' maxsize'キューに頼るのはなぜですか? ( 'queue.put'は、スロットが待ち行列で利用可能になるまでブロックします) – Vincent
@Vincentはい、そうですが、私が最初に書いたように、putにブロックしたくないのです。私はスレッドと 'threading.Queue'のソリューションを持っていましたが、プロデューサはほとんどの場合、最も遅い消費者を待っています。私はコルーチンを試してみたいと思いますし、 'asyncio.Event'のふるまいは私を困惑させるので、疑問です。 –