2017-08-01 14 views
6

私はasyncio.AbstractEventLoopに渡したいlistawaitablesを持っていますが、私はサードパーティのAPIに要求を絞る必要があります。Python Asyncioの調整非同期関数

futureをループに渡すのを待つ何かを避けたいのは、その間にループ待機をブロックするためです。どのようなオプションがありますか? SemaphoresThreadPoolsは同時に実行されている回数を制限しますが、それは私の問題ではありません。リクエストを100/secに抑える必要がありますが、リクエストを完了するまでにどれくらい時間がかかります。

これは、問題を示す標準ライブラリを使用した非常に簡潔な(非)動作例です。これは100/secでスロットルするが、116.651/secでスロットルすると考えられている。 asyncioで非同期要求のスケジューリングを抑制する最適な方法は何ですか?

の作業コード:

import asyncio 
from threading import Lock 

class PTBNL: 

    def __init__(self): 
     self._req_id_seq = 0 
     self._futures = {} 
     self._results = {} 
     self.token_bucket = TokenBucket() 
     self.token_bucket.set_rate(100) 

    def run(self, *awaitables): 

     loop = asyncio.get_event_loop() 

     if not awaitables: 
      loop.run_forever() 
     elif len(awaitables) == 1: 
      return loop.run_until_complete(*awaitables) 
     else: 
      future = asyncio.gather(*awaitables) 
      return loop.run_until_complete(future) 

    def sleep(self, secs) -> True: 

     self.run(asyncio.sleep(secs)) 
     return True 

    def get_req_id(self) -> int: 

     new_id = self._req_id_seq 
     self._req_id_seq += 1 
     return new_id 

    def start_req(self, key): 

     loop = asyncio.get_event_loop() 
     future = loop.create_future() 
     self._futures[key] = future 
     return future 

    def end_req(self, key, result=None): 

     future = self._futures.pop(key, None) 
     if future: 
      if result is None: 
       result = self._results.pop(key, []) 
      if not future.done(): 
       future.set_result(result) 

    def req_data(self, req_id, obj): 
     # Do Some Work Here 
     self.req_data_end(req_id) 
     pass 

    def req_data_end(self, req_id): 
     print(req_id, " has ended") 
     self.end_req(req_id) 

    async def req_data_async(self, obj): 

     req_id = self.get_req_id() 
     future = self.start_req(req_id) 

     self.req_data(req_id, obj) 

     await future 
     return future.result() 

    async def req_data_batch_async(self, contracts): 

     futures = [] 
     FLAG = False 

     for contract in contracts: 
      req_id = self.get_req_id() 
      future = self.start_req(req_id) 
      futures.append(future) 

      nap = self.token_bucket.consume(1) 

      if FLAG is False: 
       FLAG = True 
       start = asyncio.get_event_loop().time() 

      asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract) 

     await asyncio.gather(*futures) 
     elapsed = asyncio.get_event_loop().time() - start 

     return futures, len(contracts)/elapsed 

class TokenBucket: 

    def __init__(self): 
     self.tokens = 0 
     self.rate = 0 
     self.last = asyncio.get_event_loop().time() 
     self.lock = Lock() 

    def set_rate(self, rate): 
     with self.lock: 
      self.rate = rate 
      self.tokens = self.rate 

    def consume(self, tokens): 
     with self.lock: 
      if not self.rate: 
       return 0 

      now = asyncio.get_event_loop().time() 
      lapse = now - self.last 
      self.last = now 
      self.tokens += lapse * self.rate 

      if self.tokens > self.rate: 
       self.tokens = self.rate 

      self.tokens -= tokens 

      if self.tokens >= 0: 
       return 0 
      else: 
       return -self.tokens/self.rate 


if __name__ == '__main__': 

    asyncio.get_event_loop().set_debug(True) 
    app = PTBNL() 

    objs = [obj for obj in range(500)] 

    l,t = app.run(app.req_data_batch_async(objs)) 

    print(l) 
    print(t) 

編集:私は、セマフォを使用して、ここでTrottleTestAppの簡単な例を追加しましたが、それでも実行絞ることができません:あなたはこれを行うことができます

import asyncio 
import time 


class ThrottleTestApp: 

    def __init__(self): 
     self._req_id_seq = 0 
     self._futures = {} 
     self._results = {} 
     self.sem = asyncio.Semaphore() 

    async def allow_requests(self, sem): 
     """Permit 100 requests per second; call 
      loop.create_task(allow_requests()) 
     at the beginning of the program to start this routine. That call returns 
     a task handle that can be canceled to end this routine. 

     asyncio.Semaphore doesn't give us a great way to get at the value other 
     than accessing sem._value. We do that here, but creating a wrapper that 
     adds a current_value method would make this cleaner""" 

     while True: 
      while sem._value < 100: sem.release() 
      await asyncio.sleep(1) # Or spread more evenly 
            # with a shorter sleep and 
            # increasing the value less 

    async def do_request(self, req_id, obj): 
     await self.sem.acquire() 

     # this is the work for the request 
     self.req_data(req_id, obj) 

    def run(self, *awaitables): 

     loop = asyncio.get_event_loop() 

     if not awaitables: 
      loop.run_forever() 
     elif len(awaitables) == 1: 
      return loop.run_until_complete(*awaitables) 
     else: 
      future = asyncio.gather(*awaitables) 
      return loop.run_until_complete(future) 

    def sleep(self, secs: [float]=0.02) -> True: 

     self.run(asyncio.sleep(secs)) 
     return True 

    def get_req_id(self) -> int: 

     new_id = self._req_id_seq 
     self._req_id_seq += 1 
     return new_id 

    def start_req(self, key): 

     loop = asyncio.get_event_loop() 
     future = loop.create_future() 
     self._futures[key] = future 
     return future 

    def end_req(self, key, result=None): 

     future = self._futures.pop(key, None) 
     if future: 
      if result is None: 
       result = self._results.pop(key, []) 
      if not future.done(): 
       future.set_result(result) 

    def req_data(self, req_id, obj): 
     # This is the method that "does" something 
     self.req_data_end(req_id) 
     pass 

    def req_data_end(self, req_id): 

     print(req_id, " has ended") 
     self.end_req(req_id) 

    async def req_data_batch_async(self, objs): 

     futures = [] 
     FLAG = False 

     for obj in objs: 
      req_id = self.get_req_id() 
      future = self.start_req(req_id) 
      futures.append(future) 

      if FLAG is False: 
       FLAG = True 
       start = time.time() 

      self.do_request(req_id, obj) 

     await asyncio.gather(*futures) 
     elapsed = time.time() - start 
     print("Roughly %s per second" % (len(objs)/elapsed)) 

     return futures 


if __name__ == '__main__': 

    asyncio.get_event_loop().set_debug(True) 
    app = ThrottleTestApp() 

    objs = [obj for obj in range(10000)] 

    app.run(app.req_data_batch_async(objs)) 
+0

@のように

import asyncio class AsyncLeakyBucket(object): def __init__(self, max_tasks: float, time_period: float = 60, loop: asyncio.events=None): self._delay_time = time_period/max_tasks self._sem = asyncio.BoundedSemaphore(max_tasks) self._loop = loop or asyncio.get_event_loop() self._loop.create_task(self._leak_sem()) async def _leak_sem(self): """ Background task that leaks semaphore releases based on the desired rate of tasks per time_period """ while True: await asyncio.sleep(self._delay_time) try: self._sem.release() except ValueError: pass async def __aenter__(self) -> None: await self._sem.acquire() async def __aexit__(self, exc_type, exc, tb) -> None: pass 

はまだ同じasync with bucketコードを使用することができます1秒間に実行されていた要求の数を制限しようとしています。たとえば、それぞれ3秒かかる100個の要求を開始すると、次の2秒間に200個以上の要求を開始できますか? –

+0

@AaronSchif開始時には問題ありません。1秒間のローリングウィンドウでは100を超えて開始されません。 – Jared

答えて

14

leaky bucket algorithm

import asyncio 
import time 

class AsyncLeakyBucket(object): 
    """A leaky bucket rate limiter. 

    Allows up to max_rate/time_period acquisitions before blocking. 

    time_period is measured in seconds; the default is 60. 

    """ 
    def __init__(self, max_rate: float, time_period: float = 60) -> None: 
     self._max_level = max_rate 
     self._rate_per_sec = max_rate/time_period 
     self._level = 0.0 
     self._last_check = 0.0 

    def _leak(self) -> None: 
     """Drip out capacity from the bucket.""" 
     if self._level: 
      # drip out enough level for the elapsed time since 
      # we last checked 
      elapsed = time.time() - self._last_check 
      decrement = elapsed * self._rate_per_sec 
      self._level = max(self._level - decrement, 0) 
     self._last_check = time.time() 

    def has_capacity(self, amount: float = 1) -> bool: 
     """Check if there is enough space remaining in the bucket""" 
     self._leak() 
     return self._level + amount <= self._max_level 

    async def acquire(self, amount: float = 1) -> None: 
     """Acquire space in the bucket. 

     If the bucket is full, block until there is space. 

     """ 
     if amount > self._max_level: 
      raise ValueError("Can't acquire more than the bucket capacity") 

     while not self.has_capacity(amount): 
      # wait for the next drip to have left the bucket 
      await asyncio.sleep(1/self._rate_per_sec) 

     self._level += amount 

    async def __aenter__(self) -> None: 
     await self.acquire() 
     return None 

    async def __aexit__(self, exc_type, exc, tb) -> None: 
     pass 

キャップをリークすることに注意してくださいバケツから便宜的に、レベルを下げるために別の非同期ループを実行する必要はありません。代わりに、容量が十分に残っているかどうかをテストするときに容量がリークアウトされます。

これはコンテキストマネージャとして使用できます。

bucket = AsyncLeakyBucket(100) 

# ... 

async with bucket: 
    # only reached once the bucket is no longer full 

か、直接acquire()を呼び出すことができます:

await bucket.acquire() # blocks until there is space in the bucket 

またはスペースが最初に存在する場合、あなたは、単にテストすることができますが、十分な容量が再び解放されるまで、それは完全なブロックであるときバケツを取得しようとしています:

if bucket.has_capacity(): 
    # reject a request due to rate limiting 

あなたはバケツにあなたが「ドリップ」量を増加または減少させることによって、「重い」または「軽い」として、いくつかの要求を数えることができることに注意してください:

await bucket.acquire(10) 
if bucket.has_capacity(0.5): 

デモ:

>>> import asyncio, time 
>>> bucket = AsyncLeakyBucket(5, 10) 
>>> async def task(): 
...  async with bucket: 
...   print('Drip!', time.time() - ref) 
... 
>>> ref = time.time() 
>>> tasks = [task() for _ in range(15)] 
>>> loop = asyncio.get_event_loop() 
>>> loop.run_until_complete(asyncio.wait(tasks)) 
Drip! 0.0016927719116210938 
Drip! 0.0017199516296386719 
Drip! 0.00173187255859375 
Drip! 0.0017418861389160156 
Drip! 0.001750946044921875 
Drip! 2.003826856613159 
Drip! 4.007770776748657 
Drip! 6.011734962463379 
Drip! 8.016689777374268 
Drip! 10.019418716430664 
Drip! 12.0219247341156 
Drip! 14.026055812835693 
Drip! 16.028339862823486 
Drip! 18.03285503387451 
Drip! 20.037498712539673 

バケットは、タスクの残りの部分がより均等に広がることを引き起こして、開始時にすぐに満たされています。 2秒ごとに十分な容量が解放され、別のタスクが処理されます。

上記の実装は、1/self._rate_per_secを待つことが容量を獲得するための良い戦略であると仮定している点で、単純化した側面で少しです。容量を待つ複数のタスクがある場合、要求したのと同じ順序で容量を取得する保証はありません。代わりにasyncio.Event() instanceをタイムアウトに使用し、_leakに容量を解放したときにそのイベントを介して容量を待機している人に通知することができます。 Eventは、キューを使用して待機中の要素を順番に通知します。

+0

"Greatest algorithms";)フォルダーにブックマークされています – glenfant

0

別の解決策 - 有界セマフォを使用して - 同僚、メンター、そして友人によって、以下の通りです:あなたはマルタインの答え

関連する問題