2016-12-28 31 views
4

私は新しい非同期世界秩序にこのPython2.7コードを書き換えるしようとしている:すべての結果が計算されているので、私はその意志非同期実装としてこれを書き換えしようとしているまでPythonで非同期ジェネレータを作成するには?

def get_api_results(func, iterable): 
    pool = multiprocessing.Pool(5) 
    for res in pool.map(func, iterable): 
     yield res 

map()ブロック準備が整うとすぐに結果が得られます。 map()のように、戻り値はiterableと同じ順序で返さなければなりません。

import requests 

def get(i): 
    r = requests.get('https://example.com/api/items/%s' % i) 
    return i, r.json() 

async def get_api_results(): 
    loop = asyncio.get_event_loop() 
    futures = [] 
    for n in range(1, 11): 
     futures.append(loop.run_in_executor(None, get, n)) 
    async for f in futures: 
     k, v = await f 
     yield k, v 

for r in get_api_results(): 
    print(r) 

が、Pythonの3.6と私は取得しています:私はこれが(私があるため、従来の認証要件のrequestsを必要とする)しようとした

File "scratch.py", line 16, in <module> 
    for r in get_api_results(): 
TypeError: 'async_generator' object is not iterable 

私はこれを実現するにはどうすればよいですか?

+1

非同期コードブロック内のイベントループを入れないでください、非同期コードは、イベントループではなく、周りに他の方法で実行する必要があります。 –

+0

ありがとう!確かに、私はここで何かを逃しています。私が見たすべてのイベントループの例では、loop.run_until_complete(get_api_results())を使用しています。これは、私の理解では、呼び出しをブロックして結果を失うことになります。 –

+0

通常は、より多くのコルーチンが結果を処理し、それらを駆動するイベントループがあります。 –

答えて

4

古い(2.7)コードに関して、マルチプロセッシングは、スレッド処理があまりうまくいかないCPU集約的なタスクを同時に処理する、より単純なスレッドモジュールの強力なドロップイン置換えとみなされます。あなたのコードはCPUに束縛されていない可能性があります.HTTPリクエストを作成するだけで済みますし、スレッドを使って問題を解決できるかもしれません。

threadingを直接使用する代わりに、Python 3+にはconcurrent.futuresという素晴らしいモジュールがあり、クールなExecutorクラスを使用してよりクリーンなAPIを使用しています。このモジュールはPython 2.7のexternal packageとしても利用できます。

次のコードは、Python 2とPython 3で動作:このコードは、スレッドに基づいて、futures.ThreadPoolExecutorを使用

# For python 2, first run: 
# 
# pip install futures 
# 
from __future__ import print_function 

import requests 
from concurrent import futures 

URLS = [ 
    'http://httpbin.org/delay/1', 
    'http://httpbin.org/delay/3', 
    'http://httpbin.org/delay/6', 
    'http://www.foxnews.com/', 
    'http://www.cnn.com/', 
    'http://europe.wsj.com/', 
    'http://www.bbc.co.uk/', 
    'http://some-made-up-domain.coooom/', 
] 


def fetch(url): 
    r = requests.get(url) 
    r.raise_for_status() 
    return r.content 


def fetch_all(urls): 
    with futures.ThreadPoolExecutor(max_workers=5) as executor: 
     future_to_url = {executor.submit(fetch, url): url for url in urls} 
     print("All URLs submitted.") 
     for future in futures.as_completed(future_to_url): 
      url = future_to_url[future] 
      if future.exception() is None: 
       yield url, future.result() 
      else: 
       # print('%r generated an exception: %s' % (
       # url, future.exception())) 
       yield url, None 


for url, s in fetch_all(URLS): 
    status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed" 
    print('{}: {}'.format(url, status)) 

。ここでは多くの魔法がas_completed()に使われています。

あなたのPython 3.6コードはrun_in_executor()を使用してfutures.ProcessPoolExecutor()を作成し、実際には非同期IOを使用しません!!

asyncioを実際に進めたい場合は、asyncioをサポートするHTTPクライアント(aiohttpなど)を使用する必要があります。今つのプロセスに一つだけのスレッドを利用して、実際の非同期IOを使用して、

import asyncio 

import aiohttp 


async def fetch(session, url): 
    print("Getting {}...".format(url)) 
    async with session.get(url) as resp: 
     text = await resp.text() 
    return "{}: Got {} bytes".format(url, len(text)) 


async def fetch_all(): 
    async with aiohttp.ClientSession() as session: 
     tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay)) 
       for delay in (1, 1, 2, 3, 3)] 
     for task in asyncio.as_completed(tasks): 
      print(await task) 
    return "Done." 


loop = asyncio.get_event_loop() 
resp = loop.run_until_complete(fetch_all()) 
print(resp) 
loop.close() 

あなたが見ることができるように、asyncioas_completed()があります。ここではサンプルコードです。

+0

'コルーチンはジェネレータなので、単純な「利回り」を使うことはできません。 https://stackoverflow.com/a/37550568/2908138 – im7mortal

+0

@ im7mortal:ありがとう、私は答えからこの部分を削除しました。 – Udi

5

イベントループを別のコルーチンに入れます。それをしないでください。イベントループは、非同期コードの最も外側の「ドライバ」であり、同期して実行する必要があります。

あなたがフェッチされた結果を処理する必要がある場合は、そう多くのコルーチンを書きます。キューからデータを取得することも、フェッチを直接駆動することもできます。あなたは、たとえば、結果をフェッチし、処理し、メイン機能を持つことができ

async def main(loop): 
    for n in range(1, 11): 
     future = loop.run_in_executor(None, get, n) 
     k, v = await future 
     # do something with the result 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main(loop)) 

をちゃんとあまりにので、あなたが使用する必要はありませんaiohttpのような非同期ライブラリを使用して非同期(async)get()機能を作りたいです実行者はまったくありません。

関連する問題