2016-03-18 7 views
7

私はasync forを使用して渡すことができる非同期のiterableを持っているとしたら、それを新しい非同期イテレータにマップしてフィルタリングする方法はありますか?同期式の反復可能関数を使って同じことを行う方法の適応である次のコードは、yieldasync defの内部では使用できないため動作しません。どのように非同期的にiterableを非同期にマッピング/フィルタリングできますか?

async def mapfilter(aiterable, p, func): 
    async for payload in aiterable: 
     if p(payload): 

      # This part isn't allowed, but hopefully it should be clear 
      # what I'm trying to accomplish. 
      yield func(payload) 
+0

https://pypi.python.org/pypi/paralleltools/0.0.3? – jonrsharpe

+0

@jonrsharpeこのlibはスレッドに関するasyncioではありません。 –

+0

内部の非同期関数を生成する方法を実装しようとしました:http://stackoverflow.com/a/37572657/1113207 –

答えて

4

サポートscheduled for Python 3.6あるrecently published PEP draft (PEP 525)は、あなたが来て、同じ構文を使用して非同期発電を許可するように提案していますと一緒に。

一方、非同期イテレータ定型文を処理したくない場合は、CryingCyclopsで示されたasyncio_extrasライブラリをコメントに使用することもできます。 the docsから

@async_generator 
async def mygenerator(websites): 
    for website in websites: 
     page = await http_fetch(website) 
     await yield_async(page) 

async def fetch_pages(): 
    websites = ('http://foo.bar', 'http://example.org') 
    async for sanitized_page in mygenerator(websites): 
     print(sanitized_page) 

yield from構文をサポートしていますasync_generator libraryもあります。

2

お客様can'tコルーチン内での収量を使用します。あなたのアイデアを実装するには、私が見る方法はAsynchronous Iteratorを実装することだけです。私が正しいとすれば、そのようなもの:

class MapFilter: 
    def __init__(self, aiterable, p, func): 
     self.aiterable = aiterable 
     self.p = p 
     self.func = func 

    async def __aiter__(self): 
     return self 

    async def __anext__(self): 
     while True: 
      payload = await self.aiterable.__anext__() # StopAsyncIteration would be raise here on no new values 
      if self.p(payload): 
       return self.func(payload) 

それを試してみましょう。ここでは(私はhereからそれを取った)ヘルパーarangeクラスで完全な例です:

import asyncio 


class arange: 
    def __init__(self, n): 
     self.n = n 
     self.i = 0 

    async def __aiter__(self): 
     return self 

    async def __anext__(self): 
     i = self.i 
     self.i += 1 
     if self.i <= self.n: 
      await asyncio.sleep(0) # insert yield point 
      return i 
     else: 
      raise StopAsyncIteration 


class MapFilter: 
    def __init__(self, aiterable, p, func): 
     self.aiterable = aiterable 
     self.p = p 
     self.func = func 

    async def __aiter__(self): 
     return self 

    async def __anext__(self): 
     while True: 
      payload = await self.aiterable.__anext__() 
      if self.p(payload): 
       return self.func(payload) 


async def main(): 
    aiterable = arange(5) 
    p = lambda x: bool(x>2) 
    func = lambda x: x*2 

    async for i in MapFilter(aiterable, p, func): 
     print(i) 

if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 

出力:

6 
8 
+0

ええ、私は、これをすべて行うための構文的な砂糖があると思っていたと思います。あなたの答えを受け入れることはないと思われるので。 –

+2

asyncio_extrasには、構文上の砂糖を提供しています。http://pythonhosted.org/asyncio_extras/ – CryingCyclops

関連する問題