2017-05-18 8 views
0

this answerを読むと、私はasyncio.tasks.as_completedを横断しました。私はその機能が実際にどのように機能するのか理解していません。未完了のルーチンとして文書化されています。 イベントループに関連付けられたキューを作成し、各未来に完了コールバックを追加し、未来のキューと同じ数のアイテムをキューから取得しようとします。次のようにasyncio.as_completed work

コードのコアは、次のとおりです。

def _on_completion(f): 
     if not todo: 
      return # _on_timeout() was here first. 
     todo.remove(f) 
     done.put_nowait(f) 
     if not todo and timeout_handle is not None: 
      timeout_handle.cancel() 

    @coroutine 
    def _wait_for_one(): 
     f = yield from done.get() 
     if f is None: 
      # Dummy value from _on_timeout(). 
      raise futures.TimeoutError 
     return f.result() # May raise f.exception(). 

    for f in todo: 
     f.add_done_callback(_on_completion) 
    if todo and timeout is not None: 
     timeout_handle = loop.call_later(timeout, _on_timeout) 
    for _ in range(len(todo)): 
     yield _wait_for_one() 

私はこのコードがどのように機能するかを理解したいと思います。私の最大の質問は次のとおりです。

  • ループは実際にどこで実行されますか。 loop.run_until_cobmpleteまたはloop.run_foreverへの呼び出しが表示されません。では、ループはどのように進歩していますか?

  • メソッドのドキュメンテーションによれば、このメソッドは先物を返します。 結果を=

Fからの収量私は_wait_for_oneで戻りf.resultラインに対してという和解のトラブルを抱えている:あなたはas_completedでF用

のようなもの(先物)にそれを呼び出すことができること。文書化された呼び出し規約は正しいですか?もしそうなら、その収量はどこから来ますか?

答えて

1

コピーしたコードにはヘッダ部分がありません。これは非常に重要です。

# This is *not* a @coroutine! It is just an iterator (yielding Futures). 
def as_completed(fs, *, loop=None, timeout=None): 
    """Return an iterator whose values are coroutines. 

    When waiting for the yielded coroutines you'll get the results (or 
    exceptions!) of the original Futures (or coroutines), in the order 
    in which and as soon as they complete. 

    This differs from PEP 3148; the proper way to use this is: 

     for f in as_completed(fs): 
      result = yield from f # The 'yield from' may raise. 
      # Use result. 

    If a timeout is specified, the 'yield from' will raise 
    TimeoutError when the timeout occurs before all Futures are done. 

    Note: The futures 'f' are not necessarily members of fs. 
    """ 
    if futures.isfuture(fs) or coroutines.iscoroutine(fs): 
     raise TypeError("expect a list of futures, not %s" % type(fs).__name__) 
    loop = loop if loop is not None else events.get_event_loop() 
    todo = {ensure_future(f, loop=loop) for f in set(fs)} 
    from .queues import Queue # Import here to avoid circular import problem. 
    done = Queue(loop=loop) 
    timeout_handle = None 

    def _on_timeout(): 
     for f in todo: 
      f.remove_done_callback(_on_completion) 
      done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 
     todo.clear() # Can't do todo.remove(f) in the loop. 

    def _on_completion(f): 
     if not todo: 
      return # _on_timeout() was here first. 
     todo.remove(f) 
     done.put_nowait(f) 
     if not todo and timeout_handle is not None: 
      timeout_handle.cancel() 

    @coroutine 
    def _wait_for_one(): 
     f = yield from done.get() 
     if f is None: 
      # Dummy value from _on_timeout(). 
      raise futures.TimeoutError 
     return f.result() # May raise f.exception(). 

    for f in todo: 
     f.add_done_callback(_on_completion) 
    if todo and timeout is not None: 
     timeout_handle = loop.call_later(timeout, _on_timeout) 
    for _ in range(len(todo)): 
     yield _wait_for_one() 

[どこループが実際に動作しますか?]

semplicityの便宜上、そのタイムアウトがNoneに設定されているとします。

as_completedは、コルーチンではなく、未来の繰り返しを期待しています。したがって、この先物はすでにループにバインドされており、実行のスケジュールが設定されています。言い換えれば、これらの先物はloop.create_taskまたはasyncio.ensure_futuresの出力です(これは明示的には明示されていません)。 ループは既にループを実行しており、ループが完了すると、将来の.done()メソッドはTrueを返します。

「完了」キューが作成されます。 「完了」キューは、asyncio.queue、すなわちループ«を使用してブロッキングメソッド(.get、.put)»を実装するキューであることに注意してください。

"todo = {..."という行によって、各コルーチンの未来(fsの要素)は、ループ«にバインドされた別の未来に包まれ、この最後の未来のdone_callbackは_on_completion関数。

_on_completion関数は、ループがコルーチンの実行を完了するときに呼び出されます。コルーチンは、fsがas_completed関数に渡されます。

_on_completion関数は、todoセットから "私たちの未来"を取り除き、その結果(未来が "fs"セットにあるコルーチン)をdoneキューに入れます。 他の点では、as_completed関数はすべて、未来の結果が完了キューに移動されるように、これらの未来をdone_callbackでアタッチしています。

次に、len(fs)== len(todo)回の場合、as_completed関数は "yield from done"をブロックするコルーチンを生成します。結果をdone完了キューに入れるのを待つ_on_completed(または_on_timeout)関数を待つ

as_completed呼び出し元によって実行された "yield from"は、結果が表示されるまで待機します行ってキュー。

[その収率はから来たのでしょうか?]

には値が.putされるまで、TODOが(asyncio.queueですので、あなたがすることができます(asyncio-)ブロックという事実から来ています)をキューに入れます。