2016-06-30 17 views
1

私はasyncioジョブキューからジョブを消費し、並行してNジョブを処理するワーカークラスを作ろうとしています。ジョブによっては、追加のジョブをキューに入れることがあります。ジョブキューが空で、作業者が現在のすべてのジョブを終了すると、ジョブキューは終了するはずです。一度にN個のジョブを処理するAsyncioワーカー?

私はまだ概念的にasyncioと苦労しています。ここに私の試みの一つ、N=3です:

import asyncio, logging, random 

async def do_work(id_): 
    await asyncio.sleep(random.random()) 
    return id_ 

class JobQueue: 
    ''' Maintains a list of all pendings jobs. ''' 
    def __init__(self): 
     self._queue = asyncio.Queue() 
     self._max_id = 10 
     for id_ in range(self._max_id): 
      self._queue.put_nowait(id_ + 1) 

    def add_job(self): 
     self._max_id += 1 
     self._queue.put_nowait(self._max_id) 

    async def get_job(self): 
     return await self._queue.get() 

    def has_jobs(self): 
     return self._queue.qsize() > 0 

class JobWorker: 
    ''' Processes up to 3 jobs at a time in parallel. ''' 
    def __init__(self, job_queue): 
     self._current_jobs = set() 
     self._job_queue = job_queue 
     self._semaphore = asyncio.Semaphore(3) 

    async def run(self): 
     while self._job_queue.has_jobs() or len(self._current_jobs) > 0: 
      print('Acquiring semaphore...') 
      await self._semaphore.acquire() 
      print('Getting a job...') 
      job_id = await self._job_queue.get_job() 
      print('Scheduling job {}'.format(job_id)) 
      self._current_jobs.add(job_id) 
      task = asyncio.Task(do_work(job_id)) 
      task.add_done_callback(self.task_finished) 

    def task_finished(self, task): 
     job_id = task.result() 
     print('Finished job {}/released semaphore'.format(job_id)) 
     self._current_jobs.remove(job_id) 
     self._semaphore.release() 
     if random.random() < 0.2: 
      print('Queuing a new job') 
      self._job_queue.add_job() 

loop = asyncio.get_event_loop() 
jw = JobWorker(JobQueue()) 
print('Starting event loop') 
loop.run_until_complete(jw.run()) 
print('Event loop ended') 
loop.close() 

出力の抜粋:

Starting event loop 
Acquiring semaphore... 
Getting a job... 
Scheduling job 1 
Acquiring semaphore... 
Getting a job... 
Scheduling job 2 
Acquiring semaphore... 
Getting a job... 
Scheduling job 3 
Acquiring semaphore... 
Finished job 2/released semaphore 
Getting a job... 
Scheduling job 4 
...snip... 
Acquiring semaphore... 
Finished job 11/released semaphore 
Getting a job... 
Finished job 12/released semaphore 
Finished job 13/released semaphore 

一度に3つ以下のジョブを処理中に正しくすべてのジョブを処理するために表示されます。ただし、最後のジョブが終了した後にプログラムがハングします。出力に示されているように、それはjob_id = await self._job_queue.get_job()にぶら下がっているようです。ジョブキューが空になると、このコルーチンは決して再開されず、ジョブキューが空である(ループの最上部にある)かどうかを確認するためのチェックに再び到達しません。

私はこれをいくつかの方法で回避しようとしましたが、概念的にはあまり適切ではないものがありました。私の現在のWIPは待ち行列と作業者の間でいくつかの先物を渡しており、それらのすべてでasyncio.wait(...)の組み合わせを使用していますが、それは醜い状態になり、私が見落としているエレガントな解決策があるのだろうかと思います。

答えて

2

queue.task_doneは、以前にエンキューされたタスクが完了したことを示します。次にqueue.joinqueue.getasyncio.waitを使用して結合できます。queue.joinが完了し、queue.getが完了しない場合、これはすべてのジョブが完了したことを意味します。

この例を参照してください:

class Worker: 

    def __init__(self, func, n=3): 
     self.func = func 
     self.queue = asyncio.Queue() 
     self.semaphore = asyncio.Semaphore(n) 

    def put(self, *args): 
     self.queue.put_nowait(args) 

    async def run(self): 
     while True: 
      args = await self._get() 
      if args is None: 
       return 
      asyncio.ensure_future(self._target(args)) 

    async def _get(self): 
     get_task = asyncio.ensure_future(self.queue.get()) 
     join_task = asyncio.ensure_future(self.queue.join()) 
     await asyncio.wait(coros, return_when='FIRST_COMPLETED') 
     if get_task.done(): 
      return task.result() 

    async def _target(self, args): 
     try: 
      async with self.semaphore: 
       return await self.func(*args) 
     finally: 
      self.queue.task_done() 
+0

私は本当にこのアイデアが好きですが、 'asyncio.wait()'は予期しない順序で結果を返すので、やや面倒です。このおもちゃの例では、可能な結果は 'None'または' int'だけなので、どのコルーチンが実際に終了したのかを簡単に把握できますが、実際にはどのコルーチンが最初に終了したのかを把握するのは非常に面倒です。 'asyncio.gather()'はもう少し意味がありますが、 'return_when'パラメータがありません。うーん... –

+0

@ mehaase私の編集を見てください( 'args'は' self.func'に渡す引数のリストであり、すべてのジョブが完了していれば 'None'です)。 – Vincent

+0

ああ、私は解決策を見た後、それはとても簡単です!ありがとうございました。 –

2

get_jobをシンプルにasyncio.wait_forでタイムアウトすることができます。たとえば1秒で、タイムアウト時にループの先頭に戻ります。

async def run(self): 
     while self._job_queue.has_jobs() or len(self._current_jobs) > 0: 
      print('Acquiring semaphore...') 
      await self._semaphore.acquire() 
      print('Getting a job...') 
      try: 
       job_id = await asyncio.wait_for(self._job_queue.get_job(), 1) 
      except asyncio.TimeoutError: 
       continue 
      print('Scheduling job {}'.format(job_id)) 
      self._current_jobs.add(job_id) 
      task = asyncio.Task(do_work(job_id)) 
      task.add_done_callback(self.task_finished) 
+0

1は、これは間違いなく最も簡単&きれいな修正ですが、それは少し不正行為のように感じています。イベントループが再開するタイミングを教えるのを待つのではなく、ジョブキューをポーリングしています。 (よく、ポーリングとコルーチンのハイブリッド) –

関連する問題