2016-11-02 6 views
1

私のプロジェクトでは、私が実行するタスクの一覧があります。asyncioのタスクトップアップ

loop.run_until_complete(tasks) 

しかし、無限の数のタスクがあるので、現時点では、それらをバッチで実行します。私は、タスクの数、Iランチ非同期にこれらのタスクを実行するためにループを使用して結果を返す通常の関数を得る

def get_results(tasks): 
    return [result for result in loop.run_until_complete(handle_tasks(tasks))] 

while True: 
    tasks = get_tasks() 
    results = get_results(tasks) 

:基本的に、私はこれを持っています。

このアプローチは機能しますが、改善できると思います。

タスクのバッチを実行する代わりに、タスクのトップアップをしたいと思います。このような

何か:

while True: 
    if current_tasks < max_tasks: 
     new_tasks = get_tasks(max_tasks - current_tasks) 
     add_tasks(new_tasks) 
    current_tasks, results = stats_and_results() 

私はこの問題にアプローチする方法上の任意のアイデアを感謝しています。

ありがとうございます!

+0

タスクを 'my_task_list'に追加する方法と、リストが一定の長さよりも短い場合は追加する方法はありますか?次にリストを時々 'cancel()'や 'cancelled()'タスクを削除するために繰り返しますか? – shongololo

答えて

2

同様の問題があり、ジョブを受け取り、あらかじめ定義された同時実行性で実行する小さな「プール」ラッパーを作成しました。

あなたは、この方法でそれを使用することができます
import asyncio 
import sys 


class Pool: 
    def __init__(self, concurrency): 
     self._sem = asyncio.BoundedSemaphore(concurrency) 
     self.jobs = [] 

    async def __aenter__(self): 
     return self 

    async def __aexit__(self, *_): 
     if len(self.jobs) > 0: 
      await asyncio.wait(self.jobs) 

    def put(self, coro): 
     assert asyncio.iscoroutine(coro) 
     async def wrapped(): 
      async with self._sem: 
       await coro 
     fut = asyncio.ensure_future(wrapped()) 
     self.jobs.append(fut) 

    async def __aiter__(self): 
     return self 

    async def __anext__(self): 
     try: 
      coro = self.jobs.pop(0) 
     except IndexError: 
      raise StopAsyncIteration() 
     else: 
      return await coro 

async def main(): 
    pool = Pool(10) 
    for task in get_tasks(): 
     pool.put(task) 
    async for result in pool: 
     print('got', result) 

これは、すべてのタスクをスケジュールし、同時にそれらの高々10を実行すると、彼らは(メインに来るような結果が返されますが)

をコルーチン