2017-10-21 6 views
1

私は各ハンドラのためのユニークなキューを持つTornadoを使用して非同期サーバを作ろうとしています。ジョブは、エンドポイントが呼び出されるとキューに入れられます。キューからジョブを非同期に '消費する'コンシューマ関数があります。しかし、消費者の行動は、それをself.consumer()とかAsyncHandler.consumer()と呼ぶかによって異なる傾向があります。私の最初の推測は、インスタンスレベルのロックが原因であるが、その証拠を見つけることができないということです。私は4つの投稿要求を続けて発砲する。ここに2つのスニペットが出力されています。TornadoのRequestHandlerで__init__で呼び出された非同期のコンシューマは、静的に呼び出されるのとは異なる動作をするのはなぜですか?

import tornado.web 
from tornado import gen 
from time import sleep, time 
from tornado.queues import Queue 
from concurrent.futures import ThreadPoolExecutor 
from tornado.ioloop import IOLoop 

class AsyncHandler(tornado.web.RequestHandler): 

    JOB_QUEUE = Queue() 
    EXECUTOR = ThreadPoolExecutor() 

    def post(self): 
     job = lambda: sleep(3) or print("{}:handler called".format(int(time()))) 
     self.JOB_QUEUE.put(job) 
     self.set_status(200) 
     self.finish() 

    @staticmethod 
    @gen.coroutine 
    def consumer(): 
     while True: 
      job = yield AsyncHandler.JOB_QUEUE.get() 
      print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize())) 
      print(AsyncHandler.JOB_QUEUE) 
      output = yield AsyncHandler.EXECUTOR.submit(job) 
      AsyncHandler.JOB_QUEUE.task_done() 


if __name__ == "__main__": 
    AsyncHandler.consumer() 
    APP = tornado.web.Application([(r"/test", AsyncHandler)]) 
    APP.listen(9000) 
    IOLoop.current().start() 

これは、予想される出力を与える:

qsize : 0 
<Queue maxsize=0 tasks=1> 
1508618429:handler called 
qsize : 2 
<Queue maxsize=0 queue=deque([<function...<lambda> at 0x7fbf8f741400>, <function... <lambda> at 0x7fbf8f760ea0>]) tasks=3> 
1508618432:handler called 
qsize : 1 
<Queue maxsize=0 queue=deque([<function AsyncHandler.post.<locals>.<lambda> at 0x7fbf8f760ea0>]) tasks=2> 
1508618435:handler called 
qsize : 0 
<Queue maxsize=0 tasks=1> 
1508618438:handler called 

output = yield AsyncHandler.EXECUTOR.submit(job)出力を返すために3秒かかるので、出力が3秒の遅延で到着します。また、その間にキューの構築を見ることができます。コードの面白い作品に今

:今、私たちは__init__内の消費者を呼んでいる

qsize : 0 
<Queue maxsize=0 tasks=1> 
qsize : 0 
<Queue maxsize=0 tasks=2> 
qsize : 0 
<Queue maxsize=0 tasks=3> 
qsize : 0 
<Queue maxsize=0 tasks=4> 
1508619138:handler called 
1508619138:handler called 
1508619139:handler called 
1508619139:handler called 

注:

import tornado.web 
from tornado import gen 
from time import sleep, time 
from tornado.queues import Queue 
from concurrent.futures import ThreadPoolExecutor 
from tornado.ioloop import IOLoop 

class AsyncHandler(tornado.web.RequestHandler): 
    JOB_QUEUE = Queue() 
    EXECUTOR = ThreadPoolExecutor() 

    def __init__(self, application, request, **kwargs): 
     super().__init__(application, request, **kwargs) 
     self.consumer() 

    def post(self): 
     job = lambda: sleep(3) or print("{}:handler called".format(int(time()))) 
     self.JOB_QUEUE.put(job) 
     self.set_status(200) 
     self.finish() 

    @staticmethod 
    @gen.coroutine 
    def consumer(): 
     while True: 
      job = yield AsyncHandler.JOB_QUEUE.get() 
      print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize())) 
      print(AsyncHandler.JOB_QUEUE) 
      output = yield AsyncHandler.EXECUTOR.submit(job) 
      AsyncHandler.JOB_QUEUE.task_done() 


if __name__ == "__main__": 
    APP = tornado.web.Application([(r"/test", AsyncHandler)]) 
    APP.listen(9000) 
    IOLoop.current().start() 

出力不気味(と愉快には)のように見えます。タスクの構築と実行は並行して(キューの構築なしで)ほぼ同時に完了することがわかります。あたかもoutput = yield AsyncHandler.EXECUTOR.submit(job)がブロックしていないかのようです。多くの実験の後でさえ、私はこの動作を説明することができません。私は本当にいくつかの助けに感謝します。

答えて

0

1回実行されるため、最初のアプリケーションにはconsumerが1回だけ実行されます。各リクエストは消費者を「ブロック」(一度に1つだけループ)し、次のリクエストは前のリクエストの後に処理されます。

後者のアプリケーションでは、各リクエストで新しいconsumerループが開始されます(RequestHandlerはreqごとに作成されているため)。だから、最初のリクエストは次のものを「ブロック」しないので、getsubmitwhile Trueがあります。

+0

私は今それを言います!ありがとう!消費者を呼び出すインスタンスを印刷して確認しました。したがって、基本的に真のループは本当に単一のコンシューマコールに過ぎません。私の理解では、すべてのリクエストハンドラがメインスレッドで実行されるので、複数のコンシューマが問題になることはありません。あなたがこのアプローチに見ることができる特定の欠点はありますか? – ArikKartman

関連する問題