2011-12-11 12 views
8

私はBrukvaサイトでサンプルアプリを見たとき、彼らはTornadoでRedis接続を処理する適切な方法は何ですか? (非同期 - パブ/サブ)

のWebSocketで「 のinit」方式で新しい接続を作っている、asycクライアントBrukvaと私のトルネードアプリケーションと一緒にRedisのを使用しています
class MessagesCatcher(tornado.websocket.WebSocketHandler): 
    def __init__(self, *args, **kwargs): 
     super(MessagesCatcher, self).__init__(*args, **kwargs) 
     self.client = brukva.Client() 
     self.client.connect() 
     self.client.subscribe('test_channel') 

    def open(self): 
     self.client.listen(self.on_message) 

    def on_message(self, result): 
     self.write_message(str(result.body)) 

    def close(self): 
     self.client.unsubscribe('test_channel') 
     self.client.disconnect() 

websocketの場合は問題ありませんが、一般的なTornado RequestHandlerのポストメソッドでは、長いポーリング操作(公開サブスクリプションモデル)でそれを処理する方法は問題ありません。私は更新ハンドラのすべてのポストメソッドで新しいクライアントconnetionを作っているこれは正しいアプローチですか?私がredisコンソールでチェックすると、新しいポスト操作ごとにクライアントが増えています。ここで

enter image description here

私のコードのサンプルです。

c = brukva.Client(host = '127.0.0.1') 
c.connect() 

class MessageNewHandler(BaseHandler): 
    @tornado.web.authenticated 
    def post(self): 

     self.listing_id = self.get_argument("listing_id") 
     message = { 
      "id": str(uuid.uuid4()), 
      "from": str(self.get_secure_cookie("username")), 
      "body": str(self.get_argument("body")), 
     } 
     message["html"] = self.render_string("message.html", message=message) 

     if self.get_argument("next", None): 
      self.redirect(self.get_argument("next")) 
     else: 
      c.publish(self.listing_id, message) 
      logging.info("Writing message : " + json.dumps(message)) 
      self.write(json.dumps(message)) 

    class MessageUpdatesHandler(BaseHandler): 
     @tornado.web.authenticated 
     @tornado.web.asynchronous 
     def post(self): 
      self.listing_id = self.get_argument("listing_id", None) 
      self.client = brukva.Client() 
      self.client.connect() 
      self.client.subscribe(self.listing_id) 
      self.client.listen(self.on_new_messages) 

     def on_new_messages(self, messages): 
      # Closed client connection 
      if self.request.connection.stream.closed(): 
       return 
      logging.info("Getting update : " + json.dumps(messages.body)) 
      self.finish(json.dumps(messages.body)) 
      self.client.unsubscribe(self.listing_id) 


     def on_connection_close(self): 
      # unsubscribe user from channel 
      self.client.unsubscribe(self.listing_id) 
      self.client.disconnect() 

同様のケースのサンプルコードを提供していただければ幸いです。

+0

Redis、ZMQ、Tornadoを使用したPythonでの非同期PubSub - https://github.com/abhinavsingh/async_pubsub –

答えて

2

あなたのアプリで接続をプールする必要があります。 brukvaはこれを自動的にサポートしていないようです(redis-pyはこれをサポートしていますが、性質上ブロックされているので竜巻とうまくいっていません)ので、独自の接続プールを作成する必要があります。

パターンはかなりシンプルですが、これらは実際の操作コードではありません。

class BrukvaPool(): 

    __conns = {} 


    def get(host, port,db): 
     ''' Get a client for host, port, db ''' 

     key = "%s:%s:%s" % (host, port, db) 

     conns = self.__conns.get(key, []) 
     if conns: 
      ret = conns.pop() 
      return ret 
     else: 
      ## Init brukva client here and connect it 

    def release(client): 
     ''' release a client at the end of a request ''' 
     key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db) 
     self.__conns.setdefault(key, []).append(client) 

これはちょっと難しいかもしれませんが、それは主な考えです。

9

少し遅れましたが、私はtornado-redisを使っています。これは、竜巻のioloopと連携し、tornado.genモジュール

は、それは

pip install tornadoredis 

またはsetuptoolsの

easy_install tornadoredis 

とのピップからインストールすることができますが、あなたが本当にいけない

tornadoredisインストールそれを行う。リポジトリを複製して抽出することもできます。その後redis.connectは一度だけ呼び出され

あなたmain.pyに行くか、同等の

redis_conn = tornadoredis.Client('hostname', 'port') 
redis_conn.connect() 

に次のコードをRedisのために

python setup.py build 
python setup.py install 

接続を実行します。これはブロッキングコールなので、メインioloopを開始する前に呼び出す必要があります。すべてのハンドラ間で同じ接続オブジェクトが共有されます。

あなたは

settings = { 
    redis = redis_conn 
} 
app = tornado.web.Application([('/.*', Handler),], 
           **settings) 

使用のようなあなたのアプリケーションの設定に追加することができが

接続がself.settings['redis']時のハンドラで使用することができるか、それはたBaseHandlerとサブクラスのプロパティとして追加することができますtornadoredisそのクラスは他のリクエストハンドラのために用意されています。

class BaseHandler(tornado.web.RequestHandler): 

    @property 
    def redis(): 
     return self.settings['redis'] 
tornado.web.asynchronoustornado.gen.engineデコレータを使用するのRedisと通信する

class SomeHandler(BaseHandler): 

    @tornado.web.asynchronous 
    @tornado.gen.engine 
    def get(self): 
     foo = yield gen.Task(self.redis.get, 'foo') 
     self.render('sometemplate.html', {'foo': foo} 

詳細例と接続プーリングおよびパイプラインのような他の機能はで見つけることができる追加情報github repo。

関連する問題