2016-12-26 13 views
2

私は非同期プログラミングを初めて使いこなしています。gen.coroutineを使用しても、トルネードリクエストの応答時間が長すぎます

私はgen.coroutineでRequestHandlerを装飾しても、リクエストがまだブロックされていることがわかりました。

は、ここでのpython 2.7.11と、簡単なコードで、4.4.1

@gen.coroutine 
def store_data(data): 
    try: 
     # parse_data 
     ... 
    except ParseError as e: 
     logger.warning(e) 
     return 
    yield motor.insert_many(parsed_data) # asynchronous mongo 
    print motor.count() 

class MainHandler(RequestHandler): 
    @gen.coroutine 
    def post(self): 
     try: 
      some_argument = int(self.get_argument("some", 0)) 
      data = self.request.body 
     except Exception: 
      self.write("Improper Argument") 
      self.finish() 
      return 
     IOLoop.current().spawn_callback(lambda: store_data(data)) 
     self.write("Request Done") 
     self.finish() 

を竜巻と私は10個のスレッドでテストを行いました。アクセスログでの応答時間によると、私はいくつかの要求が、このコードの行全体が

ある

更新set_blocking_log_threshold(0.5)

File "********", line 74, in <dictcomp> 
    data = [dict({"sid": sid}, **{key: value for key, value in i.iteritems() 

のトレースバックメッセージを

[I 161222 15:40:22 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 7.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 701.00ms # Seem blocked 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 696.00ms # Seem blocked 

を遮断したと仮定します

data = [dict({"sid": sid}, **{key: value for key, value in i.iteritems() if key in need_cols}) for i in v_data] 

およびアンパックロジックは、私が

def data_generator(v_data, need_cols, sid): 
    for i in v_data: 
     temp = {key: value for key, value in i.iteritems() if key in need_cols} # discard some keys 
     temp["sid"] = sid # add same `sid` to all items 
     yield temp 

@gen.coroutine 
def store_data(data): 
    try: 
     # parse_data 
     ... 
    except ParseError as e: 
     logger.warning(e) 
     return 
    ge = data_generator(v_data, need_cols, sid) 
    yield motor.insert_many(ge) # asynchronous mongo 
    print motor.count() 

ませしきい値警告ログは、任意のより多くの報告発電機にそれを変更し、この

data = [] 
# `v_data` is a huge dict which could be considered as a mongo collection, and `i` as a mongo document 
for i in v_data: 
    temp = {key: value for key, value in i.iteritems() if key in need_cols} # discard some keys 
    temp["sid"] = sid # add same `sid` to all items 
    data.append(temp) 

のようなものですが、応答時間はまだ

[I 170109 17:26:32 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 2.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 4.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 2.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 354.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 443.00ms 
をブロックされたように見えました

次に、しきい値を0.2秒に設定しました。このメッセージ

File "*******", line 76, in store_data 
    increment = json.load(fr) 
    File "/usr/local/python2.7/lib/python2.7/json/__init__.py", line 291, in load 
    **kw) 
    File "/usr/local/python2.7/lib/python2.7/json/__init__.py", line 339, in loads 
    return _default_decoder.decode(s) 
    File "/usr/local/python2.7/lib/python2.7/json/decoder.py", line 364, in decode 
    obj, end = self.raw_decode(s, idx=_w(s, 0).end()) 
    File "/usr/local/python2.7/lib/python2.7/json/decoder.py", line 380, in raw_decode 
    obj, end = self.scan_once(s, idx) 

を手に入れた今、私は、問題はあなたのstore_dataコルーチン関数を呼び出しているかについてかもしれないと思うどのようにこの文の非同期

答えて

0

を作るには考えてきません。あなたは正しい方法でコルーチンを呼び出さなければなりません。それはhereに言及されたよう:

ほぼすべてのケースでは、コルーチンを呼び出す関数は コルーチン自体も、通話中yieldキーワードを使用する必要があります。

したがってstore_dataは、このように呼び出す必要があります。yield store_data()ではなく、store_data()です。ここで

IOLoop.current().spawn_callback(lambda: store_data(data)) 

あなたが引数としてあなたの関数にdataを与えたいと思うので、私はあなたがlambdaを使用していると思いますが、あなたはspawn_callback自身でこれを行うことができます。あなたはこれを試すことができます:

IOLoop.current().spawn_callback(store_data, data) 

希望がこれを助けます。

0

@gen.coroutineで関数をデコレートすると、その関数が決してうまくいかない場合は、yieldsとなります。

お客様のpost()メソッドは正しく見えます。IOLoopを妨害することはありません。しかし、IOLoopは共有リソースであり、ブロックするものはあなたが見ているタイミングを引き起こす可能性があります。 store_data(またはプログラムの他の場所)に表示されていないものがブロックされていると思われます。このブロッキングがどこにあるかを強調するには、プログラムの開始時にIOLoop.current().set_blocking_log_threshold(0.5)に電話をかけると、IOLoopが0.5秒間ブロックされたときにスタックトレースが記録されます。

+0

ありがとう、ベン!私は質問の説明を更新しました。私にさらに助けてくれたらいいですか? – Morry

+0

jsonの構文解析だけで長時間ブロッキングしているデータがたくさんある場合は、この作業をThreadPoolExecutorに移すだけでよいでしょう。 'increment = yield executor.submit(json.load、fr)'です。 jsonを解析したら、それを細かいバッチに分割して処理することができます。また、ストリーミングインターフェイスを高速化したり、サポートしたりするサードパーティのjsonライブラリもあります。 –

関連する問題