2016-07-31 23 views
2

私はasyncioフレームワークに基づいてアプリを書いています。このアプリは、レート制限(最大2コール/秒)を持つAPIと相互作用します。だから私はAPIと対話するメソッドをレートリミッタとして使うためにセロリに移動しました。しかし、それはオーバーヘッドのように見えます。Asyncio&レート制限

coroutinsの実行を保証する新しいasyncioイベントループ(または他のもの)を作成する方法はありませんn /秒?

答えて

2

私はあなたがこのようにサイクルを書くことができると考えている:

while True: 
    t0 = loop.time() 
    await make_io_call() 
    dt = loop.time() - t0 
    if dt < 0.5: 
     await asyncio.sleep(0.5 - dt, loop=loop) 
+0

感謝を印刷します!私は答えを待つ間にこの方法でデコレータを作った。これがこれを行う正しい単一のアプローチだと思われます。これは本当です? –

+0

「単一の適切なアプローチ」とはどういう意味ですか? 私にとって、これは問題を解決するための最も単純で最も明白な方法ですが、十数個の複雑な解決策を招待することができます。 –

+0

これはまさに私が聞きたいものです:)ありがとう –

3

受け入れ答えが正確です。ただし、通常、可能な限り2QPSに近づけたいと考えています。このメソッドは並列化を提供しません。これは、make_io_call()の実行に1秒以上かかる場合に問題になる可能性があります。より良い解決策は、make_io_callにセマフォを渡すことです。これは、実行が開始できるかどうかを知るために使用できます。ここで

は、そのような実装です:レート制限が要件を下回った後RateLimitingSemaphoreはそのコンテキストを解放します。

import asyncio 
from collections import deque 
from datetime import datetime 

class RateLimitingSemaphore: 
    def __init__(self, qps_limit, loop=None): 
     self.loop = loop or asyncio.get_event_loop() 
     self.qps_limit = qps_limit 

     # The number of calls that are queued up, waiting for their turn. 
     self.queued_calls = 0 

     # The times of the last N executions, where N=qps_limit - this should allow us to calculate the QPS within the 
     # last ~ second. Note that this also allows us to schedule the first N executions immediately. 
     self.call_times = deque() 

    async def __aenter__(self): 
     self.queued_calls += 1 
     while True: 
      cur_rate = 0 
      if len(self.call_times) == self.qps_limit: 
       cur_rate = len(self.call_times)/(self.loop.time() - self.call_times[0]) 
      if cur_rate < self.qps_limit: 
       break 
      interval = 1./self.qps_limit 
      elapsed_time = self.loop.time() - self.call_times[-1] 
      await asyncio.sleep(self.queued_calls * interval - elapsed_time) 
     self.queued_calls -= 1 

     if len(self.call_times) == self.qps_limit: 
      self.call_times.popleft() 
     self.call_times.append(self.loop.time()) 

    async def __aexit__(self, exc_type, exc, tb): 
     pass 


async def test(qps): 
    executions = 0 
    async def io_operation(semaphore): 
     async with semaphore: 
      nonlocal executions 
      executions += 1 

    semaphore = RateLimitingSemaphore(qps) 
    start = datetime.now() 
    await asyncio.wait([io_operation(semaphore) for i in range(5*qps)]) 
    dt = (datetime.now() - start).total_seconds() 
    print('Desired QPS:', qps, 'Achieved QPS:', executions/dt) 

if __name__ == "__main__": 
    asyncio.get_event_loop().run_until_complete(test(100)) 
    asyncio.get_event_loop().close() 

Desired QPS: 100 Achieved QPS: 99.82723898022084