私はasyncio
ジョブキューからジョブを消費し、並行してN
ジョブを処理するワーカークラスを作ろうとしています。ジョブによっては、追加のジョブをキューに入れることがあります。ジョブキューが空で、作業者が現在のすべてのジョブを終了すると、ジョブキューは終了するはずです。一度にN個のジョブを処理するAsyncioワーカー?
私はまだ概念的にasyncio
と苦労しています。ここに私の試みの一つ、N=3
です:
import asyncio, logging, random
async def do_work(id_):
await asyncio.sleep(random.random())
return id_
class JobQueue:
''' Maintains a list of all pendings jobs. '''
def __init__(self):
self._queue = asyncio.Queue()
self._max_id = 10
for id_ in range(self._max_id):
self._queue.put_nowait(id_ + 1)
def add_job(self):
self._max_id += 1
self._queue.put_nowait(self._max_id)
async def get_job(self):
return await self._queue.get()
def has_jobs(self):
return self._queue.qsize() > 0
class JobWorker:
''' Processes up to 3 jobs at a time in parallel. '''
def __init__(self, job_queue):
self._current_jobs = set()
self._job_queue = job_queue
self._semaphore = asyncio.Semaphore(3)
async def run(self):
while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
print('Acquiring semaphore...')
await self._semaphore.acquire()
print('Getting a job...')
job_id = await self._job_queue.get_job()
print('Scheduling job {}'.format(job_id))
self._current_jobs.add(job_id)
task = asyncio.Task(do_work(job_id))
task.add_done_callback(self.task_finished)
def task_finished(self, task):
job_id = task.result()
print('Finished job {}/released semaphore'.format(job_id))
self._current_jobs.remove(job_id)
self._semaphore.release()
if random.random() < 0.2:
print('Queuing a new job')
self._job_queue.add_job()
loop = asyncio.get_event_loop()
jw = JobWorker(JobQueue())
print('Starting event loop')
loop.run_until_complete(jw.run())
print('Event loop ended')
loop.close()
出力の抜粋:
Starting event loop
Acquiring semaphore...
Getting a job...
Scheduling job 1
Acquiring semaphore...
Getting a job...
Scheduling job 2
Acquiring semaphore...
Getting a job...
Scheduling job 3
Acquiring semaphore...
Finished job 2/released semaphore
Getting a job...
Scheduling job 4
...snip...
Acquiring semaphore...
Finished job 11/released semaphore
Getting a job...
Finished job 12/released semaphore
Finished job 13/released semaphore
一度に3つ以下のジョブを処理中に正しくすべてのジョブを処理するために表示されます。ただし、最後のジョブが終了した後にプログラムがハングします。出力に示されているように、それはjob_id = await self._job_queue.get_job()
にぶら下がっているようです。ジョブキューが空になると、このコルーチンは決して再開されず、ジョブキューが空である(ループの最上部にある)かどうかを確認するためのチェックに再び到達しません。
私はこれをいくつかの方法で回避しようとしましたが、概念的にはあまり適切ではないものがありました。私の現在のWIPは待ち行列と作業者の間でいくつかの先物を渡しており、それらのすべてでasyncio.wait(...)
の組み合わせを使用していますが、それは醜い状態になり、私が見落としているエレガントな解決策があるのだろうかと思います。
私は本当にこのアイデアが好きですが、 'asyncio.wait()'は予期しない順序で結果を返すので、やや面倒です。このおもちゃの例では、可能な結果は 'None'または' int'だけなので、どのコルーチンが実際に終了したのかを簡単に把握できますが、実際にはどのコルーチンが最初に終了したのかを把握するのは非常に面倒です。 'asyncio.gather()'はもう少し意味がありますが、 'return_when'パラメータがありません。うーん... –
@ mehaase私の編集を見てください( 'args'は' self.func'に渡す引数のリストであり、すべてのジョブが完了していれば 'None'です)。 – Vincent
ああ、私は解決策を見た後、それはとても簡単です!ありがとうございました。 –