2016-12-10 12 views
0

私は1人のプロデューサーと2 遅い消費者とのプログラムを持っていると私は、各消費者が最後の値だけを処理するような方法でコルーチンでそれを書き換えたいのですがasyncio.Event()でブロッキングした消費者 (私はスレッドとthreading.Queue()を使用しましたが、それをブロックしてput()にブロックすると、ほとんどの場合キューが満杯になってしまいます)。Pythonのasyncio -

answer to this questionを読んだ後、私はasyncio.Eventasyncio.Queueを使用することに決めました。私は、このプロトタイプのプログラムを書いた:

import asyncio 

async def l(event, q): 
    h = 1 
    while True: 
     # ready 
     event.set() 
     # get value to process 
     a = await q.get() 
     # process it 
     print(a * h) 
     h *= 2 

async def m(event, q): 
    i = 1 
    while True: 
     # pass element to consumer, when it's ready 
     if event.is_set(): 
      await q.put(i) 
      event.clear() 
     # produce value 
     i += 1 

el = asyncio.get_event_loop() 
ev = asyncio.Event() 
qu = asyncio.Queue(2) 
tasks = [ 
      asyncio.ensure_future(l(ev, qu)), 
      asyncio.ensure_future(m(ev, qu)) 
     ] 
el.run_until_complete(asyncio.gather(*tasks)) 
el.close() 

を、私はq.get()ライン上でそのlコルーチンブロックに気づいたし、何も印刷されません。

私は(私は1,11,21、...を得る)の両方でasyncio.sleep()を追加した後に期待どおりに動作します:

import asyncio 
import time 

async def l(event, q): 
    h = 1 
    a = 1 
    event.set() 
    while True: 
     # await asyncio.sleep(1) 
     a = await q.get() 
     # process it 
     await asyncio.sleep(1) 
     print(a * h) 
     event.set() 

async def m(event, q): 
    i = 1 
    while True: 
     # pass element to consumer, when it's ready 
     if event.is_set(): 
      await q.put(i) 
      event.clear() 
     await asyncio.sleep(0.1) 
     # produce value 
     i += 1 

el = asyncio.get_event_loop() 
ev = asyncio.Event() 
qu = asyncio.Queue(2) 
tasks = [ 
      asyncio.ensure_future(l(ev, qu)), 
      asyncio.ensure_future(m(ev, qu)) 
     ] 
el.run_until_complete(asyncio.gather(*tasks)) 
el.close() 

...しかし、私はそれなしで解決策を探しています。

なぜそうですか?どうすれば修正できますか?私はawait l()mから呼び出すことはできないと思います。それらの両方に状態があります(元のプログラムでは最初のPyGameと2番目のプロットの結果が表示されます)。

+0

単に 'Event'を使うのではなく、' maxsize'キューに頼るのはなぜですか? ( 'queue.put'は、スロットが待ち行列で利用可能になるまでブロックします) – Vincent

+0

@Vincentはい、そうですが、私が最初に書いたように、putにブロックしたくないのです。私はスレッドと 'threading.Queue'のソリューションを持っていましたが、プロデューサはほとんどの場合、最も遅い消費者を待っています。私はコルーチンを試してみたいと思いますし、 'asyncio.Event'のふるまいは私を困惑させるので、疑問です。 –

答えて

1

m関数を実行しているタスクが停止することはないので、コードは期待通りに機能しません。タスクは、event.is_set()== Falseの場合、iをインクリメントし続けます。このタスクは決して中断されないので、関数lを実行するタスクは決して呼び出されません。したがって、関数mを実行しているタスクを中断する方法が必要です。サスペンドの1つの方法が別のコルーチンを待っています。それがasyncio.sleepが期待どおりに動作する理由です。

次のコードは期待どおりに動作すると思います。 LeakyQueueは、プロデューサからの最後の値だけがコンシューマによって処理されることを保証します。複雑さは非常に対称的であるため、消費者はプロデューサによって生成されたすべての値を消費します。遅延引数を増やすと、コンシューマはプロデューサによって作成された最後の値だけを処理することをシミュレートできます。

import asyncio 

class LeakyQueue(asyncio.Queue): 
    async def put(self, item): 
     if self.full(): 
      await self.get() 
     await super().put(item) 

async def consumer(queue, delay=0): 
    h = 1 
    while True: 
     a = await queue.get() 
     if delay: 
      await asyncio.sleep(delay) 
     print ('consumer', a) 
     h += 2 

async def producer(queue): 
    i = 1 
    while True: 
     await asyncio.ensure_future(queue.put(i)) 
     print ('producer', i) 
     i += 1 

loop = asyncio.get_event_loop() 
queue = LeakyQueue(maxsize=1) 
tasks = [ 
    asyncio.ensure_future(consumer(queue, 0)), 
    asyncio.ensure_future(producer(queue)) 
] 
loop.run_until_complete(asyncio.gather(*tasks)) 
+0

Pythonのドキュメントでは 'Event.wait()'はブロックされていますが、 'Event.is_set()'についてはそのような注意はありません。本気ですか?私はプロデューサーに後続の価値を生み出させたいと思っています。消費者は、時間通りに処理することができない値をスキップしてもらいたいです。 'event.wait()'と 'asyncio.sleep'(長い計算をシミュレートする - 私はそれがOKかどうかわかりません)' l'で私はちょうど** 1,2,3、... **を取得しますstdout。 –

+0

誤解を避けるための完全な例を教えてください。 –