2016-04-27 12 views
1

私は竜巻とウェブソケットを使用しています。竜巻のPeriodicCallbackでブロックコードを処理する方法

私がメッセージを受信すると(以下の例では「開始」&「停止」)、startはx秒ごとにmemcachedからのメッセージをポーリングし、json出力をwebsocketクライアントに返します。

"tornado.ioloop.PeriodicCallback"を使用してx秒ごとにポーリングします。だから、私が持っている質問は、この場合の関数 "poll_for_new_data"がブロックコード、すなわちmemcachedがjson.dumpsをjsonなどに変換することです。スロー ?これを処理するための別のデザインの提案がある場合は、教えてください。私はブロック機能のためのスレッドプールを使用する予定だった、それは良いアイデアですか(または)これを行うには良い方法がありますか?

このプロジェクトでは、特定の要件のためにC++ libmemcachedを使用しているため、pylibmc(memcachedクライアント)を使用する必要があります。私はそれがC + +のlibmemcachedをボンネットの下で使用しない限り、非同期memcacheライブラリを使用することはできません。お時間を

おかげ

注:私はここのコードを共有していなかったように私は私のCustomClass固有のコードをコメントしました。

from tornado import ioloop, websocket, httpserver, gen 
import tornado 
import json 
# from custom_class import CustomClass 


class CustomWebSocketHandler(tornado.websocket.WebSocketHandler): 

    def check_origin(self, origin): 
     # for CORS 
     print(origin) 
     return True 

    def open(self): 
     print("open " + self.get_argument("user")) 

    def on_message(self, message): 
     message = message.lower() 
     print("on_message: " + message) 
     if(message == "start"): 
      self.request = {"user": self.get_argument("user")} 
      #self.all_pids = set() 
      #self.custom_class = CustomClass(self.request) 
      # poll for data every x seconds 
      self.periodic_callback = tornado.ioloop.PeriodicCallback(
       self.poll_for_new_data, 10 * 1000) 
      self.periodic_callback.start() 
     elif(message == "stop"): 
      self.close() 

    def on_close(self): 
     print("on_close") 
     self.periodic_callback.stop() 

    def poll_for_new_data(self): 
     if self.ws_connection is None: 
      return 

     # check for new pids 
     # [queries memcached servers using pylibmc (http://sendapatch.se/projects/pylibmc/) client] 
     # new_pids = self.custom_class.get_new_pids(self.all_pids) 

     # if(len(new_pids) > 0): 
      # if self.ws_connection is not None: 
       # self.write_message(json.dumps(new_pids)) 

     # there is more code here which I did not include 
     # i.e. this function pools for process that register themselves 
     # and each process has messages. so every time this method is called 
     # we are looking for new PIDs and new messages for each PID 
     # sending the result to websocket client in JSON format 


def make_app(): 
    return tornado.web.Application([ 
     (r'/ws', CustomWebSocketHandler) 
    ]) 

if __name__ == "__main__": 
    app = make_app() 
    http_server = tornado.httpserver.HTTPServer(app) 
    http_server.listen(8888) 
    # http://tornadokevinlee.readthedocs.org/en/latest/ioloop.html 
    tornado.ioloop.IOLoop.current().start() 

答えて

0

悪いアイデアが開かれ、各WebSocketのインスタンスに新しい定期的なポーラーをバインドするように一般的に、それが聞こえます。別のポーラーをサーバー上の別のワーカープロセスとして実装し、発生したときに更新を公開するのはなぜでしょうか。

Redisのpubsubを使用して、各WebSocketインスタンスをRedisチャネルにサブスクライブし、各更新時にポーリング作業者から公開することができます。一般的に

+0

入力いただきありがとうございます。私はあなたのアプローチをショットにし、私の所見をここに掲載します。 – StudentForever

0

私の代わりにPeriodicCallbackのループコルーチンを使用することをお勧めします:http://www.tornadoweb.org/en/stable/guide/coroutines.html#running-in-the-background

これは、簡単にブロック呼び出しを管理するために、スレッドプールを使用するようになります。ちょうどyield thread_pool.submit(f)を使用してください。 PeriodicCallbackを使用してスレッドプールにタスクを送信しようとすると、オーバーランが複数の要求を並列に実行しないようにするのは難しい場合があります。

+0

ありがとう、それを試してみて、ここに私の調査結果を掲載します。申し訳ありませんが現在他のものに追いついた – StudentForever

関連する問題