2017-12-11 7 views
2

私はトルネードでREST APIを実装しており、それを非ブロックにしたいと考えています。python TornadoハンドラIOがサーバーネットワーク全体をブロックする

は現在、問題に関連するコードは次のようになります:

class ReprsHandler(web.RequestHandler): 
    async def get(self, name): 

     db = await dbf.create_handler() 

     if 'id' in list(self.request.query_arguments.keys()): 
      db_future = asyncio.ensure_future(db.get_repr(name, self.get_query_argument('id'))) 
     else: 
      db_future = asyncio.ensure_future(db.get_reprs(name)) 

     result = await db_future 
     response = result.toSerializedStream() 

     self.set_status(HTTPStatus.OK) 
     self.write(response) 
     self.set_header('Content-Type', 'text/plain') 
     self.finish() 


class App(object): 
    def __init__(self, loop): 
     self.server_app = web.Application(
      handlers=[ 
       (r"/api/v1/([a-zA-Z0-9_-]+)/reprs", ReprsHandler), 
      ] 
     ) 

def main(): 
    AsyncIOMainLoop().install() 
    loop = asyncio.get_event_loop() 
    app = App(loop) 
    server = tornado.httpserver.HTTPServer(app.server_app, max_body_size=config['max_upload_size'], max_buffer_size=config['max_upload_size']) 
    server.bind(config['server_port']) 
    server.start() 

    loop.run_forever() 

シンプルなコードが、それはすべてを送信するために約3〜4分かかるように、データは、非常に大きいです。

ハンドラのロジックとネットワーキングIOが両方ともノンブロッキングであると予想されましたが、応答としてデータを送信しながらサーバーネットワークをブロックします。論理は良いです。彼らは他の要求をブロックしません。

詳細:

  • このコードは、Python 3.5で実装され、Ubuntuの16.04、ドッキングウィンドウ上で実行されます。
  • サーバがポートプロキシとしてnginxを使用しています。

何か問題がありますか?私はこの問題の原因を知りません。

+0

'result.toSerializedStream()'とは何ですか?あなたは "データがかなり大きい"と述べているので、私は 'result'変数がデータであり、それが大きいと仮定しています。だから、 '.toSerializedStream()'が大量のデータに対してCPUバインドされたタスクを実行すると、それがブロックの理由になるかもしれません。 – xyres

+0

このメソッドは、データオブジェクトのシリアル化された文字列を生成するためにpickleダンプを使用しますが、ロジックの時間コストはネットワーキングコストほど大きくはありません。それでも、私はパフォーマンスの改善を行うことができます。ありがとう。 –

答えて

1

result.toSerializedStream()がデータを漬けていると述べたので、だから、ええ、あなたが正しいのは、ブロックがネットワークioのためだということです。

チャンクでデータを送信し、self.write()の後にself.flush()と電話をかけることができます。 flushを呼び出すと、応答がネットワークに書き込まれます。 awaitflushにあるので、データがネットワークソケットに書き込まれるまで、コルーチンは一時停止し、サーバーはブロックされません。これにより、他のハンドラを非同期で実行することができます。

コードサンプル:

async def get(self, name): 
    ... 
    response = result.toSerializedStream() 

    chunk_size = 1024 * 1024 * 10 # 10 MiB 

    start_byte = 0 
    while True: 
     chunk = response[start_byte : start_byte + chunk_size] 
     if not chunk: 
      break 
     self.write(chunk) 
     await self.flush() # wait while data is flushed to network 

     start_byte += chunk_size # move start_byte forward 

重要:ここで注意すべき

一つ重要なことは、self.flush()はかなり高速であることです。小さなデータをネットワークに流している場合は、awaitの遅延が非常に小さいため、コルーチンが停止せずに実行され、サーバーがブロックされます。上記のサンプルコードで

、私は10 MiBchunk_sizeを設定しましたが、コンピュータが高速であれば、await遅延が非常に、非常に小さくなり、全体のデータが送信されるまでループは一時停止せずに実行することがあります。

お客様のニーズに応じてchunk_sizeの値を増減することをお勧めします。


さらに改善提案:

全てのデータがメモリにあります。あなたのハンドラが非同期でブロックされないので、別のリクエストがReprsHandlerに来ると、メモリ内のデータストレージが増えます。ますます多くのリクエストが入ってくると、何が起こるのかを知ることができます。

これを避けるには、データをメモリに保存する代わりに、ファイルにダンプできます。その後、あなたのハンドラでちょうどopenファイルを読み込み、それをチャンクで読んで送信してください。

+0

素晴らしい提案。ほんとうにありがとう。すぐにそれに取り組む。 –

0

OK。この質問はとても愚かでした。

私はこの非ブロッキングAPIがパラレルネットワーキングとして動作することを期待していました。そのため、ネットワーキング全体が互いに干渉し合うことはありません。そして、それは竜巻が設計されているものではありません。明らかに非ブロッキングですが、依然としてシングルスレッドです。

関連する問題