2016-12-21 9 views
1

私はいくつかのデータをフェッチして返すasyncioコルーチンをいくつか持っています。このように:普通の関数からPythonコルーチン(async def)を呼び出すテスト

async def fetch_data(*args): 
    result = await some_io() 
    return result 

基本的にこのコルーチンはコルーチンのチェーンから呼び出され、初期コルーチンは、タスクを作成することでrunnedされます。 しかし、どのようなテスト目的のために、私は一つだけコルーチンを実行するためにいくつかのファイルを実行しているだけで、このようしたい場合:

if __name__ == '__main__': 
    result = await fetch_data(*args) 
    print(result) 

をそして明らかに私は私が実行しようとしているので、これを行うと、コルーチンないからコルーチンを待つことはできませんが関数。 それでは、問題は、コルーチンから関数を呼び出さずにデータを取得する正しい方法はありますか? Futureのオブジェクトをresultに作成しておきますが、もう少しシンプルで鮮明な方法がいくつかありますか?

答えて

1

あなたのコルーチンを実行するために、イベントループを作成する必要があります。

def run_coroutine(f, *args, **kwargs): 
    loop = asyncio.get_event_loop() 
    result = loop.run_until_complete(f(*args, **kwargs)) 
    loop.close() 
    return result 

使用このような:

print(run_coroutine(async_func)) 

または:

import asyncio 

async def async_func(): 
    return "hello" 

loop = asyncio.get_event_loop() 
result = loop.run_until_complete(async_func()) 
loop.close() 

print(result) 

または関数として

assert "expected" == run_coroutine(fetch_data, "param1", param2="foo") 
関連する問題