2011-10-24 14 views
21

は、私はいくつかのメッセージを送信するためのRedis'のpubsubを使用したいが、以下のコードのように、listenを使用してブロックされたくない:non-blocking Redis pubsubは可能ですか?

import redis 
rc = redis.Redis() 

ps = rc.pubsub() 
ps.subscribe(['foo', 'bar']) 

rc.publish('foo', 'hello world') 

for item in ps.listen(): 
    if item['type'] == 'message': 
     print item['channel'] 
     print item['data'] 

最後forセクションでは、ブロックします。私はちょうど与えられたチャンネルがデータを持っているかどうかをチェックしたいのですが、どうすればこのことができますか? checkのような方法がありますか?

+1

聴いてブロックしたくない理由はありますか? Redis接続はかなり安く、一般的にはそれらのいくつかを生成するのが一般的です。 –

+1

Redis、ZMQ、Tornadoを使用したPythonの非同期PubSub - https://github.com/abhinavsingh/async_pubsub –

+1

.listen()の代わりにpubsubオブジェクトの.get_message()メソッドを使用します(以下の例があります)。 [この質問が投稿されたとき、そのメソッドはPython Redisドライバでサポートされていない可能性があります]。 –

答えて

7

私はそれが可能だろうとは思いません。チャンネルには「現在のデータ」がありません。チャンネルを購読してチャンネル上の他のクライアントがプッシュしているメッセージを受信し始めると、ブロックされたAPIになります。また、パブ/サブのためにRedis Commands documentationを見ると、より明確になります。

+0

私はこの回答を他のものと組み合わせて考えるとかなり完成しました。彼はこれを糸に入れることができた。もし彼がシャネルが活動しているときに即座に行動を起こさせたいなら、彼はそれを辞書に保存してロックミューテックスで辞書を調べる自分のチェック方法を持つことができます – jdi

1

あなたはパラダイムコードの別の種類の操作を行う必要がありますなしブロッキングコードに到達するために。新しいスレッドを使用してすべての変更をリッスンし、メインスレッドを別のものにすることは難しくありません。

また、あなたはメインスレッドとRedisの加入者スレッド間でデータを交換するために、いくつかのメカニズムが必要になります。

6

これは、ブロッキングリスナーをスレッドに作業例です。

import sys 
import cmd 
import redis 
import threading 


def monitor(): 
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0) 

    channel = sys.argv[1] 
    p = r.pubsub() 
    p.subscribe(channel) 

    print 'monitoring channel', channel 
    for m in p.listen(): 
     print m['data'] 


class my_cmd(cmd.Cmd): 
    """Simple command processor example.""" 

    def do_start(self, line): 
     my_thread.start() 

    def do_EOF(self, line): 
     return True 


if __name__ == '__main__': 
    if len(sys.argv) == 1: 
     print "missing argument! please provide the channel name." 
    else: 
     my_thread = threading.Thread(target=monitor) 
     my_thread.setDaemon(True) 

     my_cmd().cmdloop() 
+0

GILは今のところピクチャに入っていませんか?代わりにマルチプロセッシング(http://docs.python.org/2/library/multiprocessing.html)を代わりに使用できますか?しかし、その方法ではプロセスを作成するオーバーヘッドが発生します。 – Pramod

+0

CPUゾーンでは動作していませんが、ネットワークリスンの代わりにGILがパーティを台無しにすることはありません。 – Andrew

0

gevent、gevent monkeyのパッチを適用して非ブロッキングのredis pubsubアプリケーションを構築できます。

42

あなたはノンブロッキング、非同期処理のことを考えているなら、あなたはおそらく使用している(または使用する必要があります)、非同期フレームワーク/サーバー。

  • あなたがTornadoを使用している場合は、Tornado-Redisあります。これはネイティブのTornadoジェネレータコールを使用しています。そのWebsocket demoは、pub/subと組み合わせて使用​​する方法の例を示しています。

  • Twistedを使用している場合は、txRedisがあります。そこにはpub/sub exampleもあります。

  • Geventと組み合わせたRedis-pyを使用しても問題がなく、Gevent's monkey patchinggevent.monkey.patch_all())を使用できるようです。

+2

これは、チェックマークを付けるべき正しい答えです。私は人々がなぜその輪を再発明するのか分かりません。既存の非同期クライアントが存在し、新しいスレッドを生成することは、そのようなクライアントの存在には本当に必要ではありません。 – securecurve

+0

@securecurve:公正であるために、私はマークされた1年後にその答えを追加しました。しかし、txRedisとbrükva(どちらもTornado-Redisがフォークしています)はどちらも3歳ですから、本当に言い訳ではありません。 – vartec

+0

この回答は質問とは関係ありません。受け入れられた回答で指摘されているように、redisは、聞いているクライアントにメッセージをプッシュします。したがって、メッセージを求める方法はありません。 – Glaslos

1

最も効率的なアプローチは、スレッドベースではなくグリーンレットベースです。 Greenletベースの並行処理フレームワークとして、geventはPythonの世界で既に確立されています。したがって、redis-pyとの統合は素晴らしくなります。それはgithubの上で、この問題で議論されています正確に何である:

https://github.com/andymccurdy/redis-py/issues/310

0

Redisのパブ/サブチャネル上で(リスニング)サブスクライブしているクライアントにメッセージを送信します。あなたが聞いていない場合、あなたはメッセージを逃すでしょう(したがって、ブロッキングコール)。それをノンブロッキングにしたいのであれば、代わりにキューを使うことをお勧めします。あなたはパブリッシュ/サブスクライブを使用する必要がある場合は、リスナーをブロックし、非同期を有することが示唆geventとして使用したキューにメッセージをプッシュし、ノンブロッキングな方法でそのキューからのメッセージを処理するために別の消費者を使用することができます。

9

新しいバージョンのredispyでは、非同期pubsubがサポートされています。詳細はhttps://github.com/andymccurdy/redis-pyを参照してください。 はここにドキュメント自体からの例です:ここでは

while True: 
    message = p.get_message() 
    if message: 
     # do something with the message 
    time.sleep(0.001) # be nice to the system :) 
2

は、スレッドのないノンブロッキングソリューションです:

fd = ps.connection._sock.fileno(); 
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block 
if rlist: 
    for rfd in rlist: 
     if fd == rfd: 
      message = ps.get_message() 

ps.get_message()は、自分自身で十分ですが、私は、複数のを待つことができるように、私はこの方法を使用しますfdを赤色の接続の代わりに使用します。

5

redis-pyはノンブロッキングget_message()を使用することをお勧めします。しかし、スレッドを簡単に使用する方法も提供します。

https://pypi.python.org/pypi/redis

メッセージを読むための3つの異なる戦略があります。

シーンの裏側で、​​get_message()はシステムの 'select'モジュールを使用して接続のソケットをすばやくポーリングします。読み込み可能なデータがある場合、get_message()はそれを読み込み、メッセージをフォーマットして返すか、メッセージハンドラに渡します。読み込むデータがない場合、get_message()はただちにNoneを返します。これにより、アプリケーション内の既存のイベントループに統合するのが簡単になります。

while True: 
    message = p.get_message() 
    if message: 
     # do something with the message 
    time.sleep(0.001) # be nice to the system :) 

古いバージョンのredis-pyは、pubsub.listen()でメッセージを読み取ります。 listen()は、メッセージが利用可能になるまでブロックするジェネレータです。アプリケーションがredisから受信したメッセージを受信して​​処理する必要がない場合、listen()は簡単に実行を開始する方法です。

for message in p.listen(): 
    # do something with the message 

3番目のオプションは、別のスレッドでイベントループを実行します。 pubsub.run_in_thread()は新しいスレッドを作成し、イベントループを開始します。スレッドオブジェクトはrun_in_thread()の呼び出し側に返されます。呼び出し元は、thread.stop()メソッドを使用してイベントループとスレッドをシャットダウンできます。背後では、これは別のスレッドで実行されるget_message()のラッパーです。本質的に小さな非ブロッキングイベントループを作成します。 run_in_thread()はオプションのsleep_time引数をとります。指定された場合、イベントループは、ループの各繰り返しで値を指定してtime.sleep()を呼び出します。

注:別のスレッドで実行されているため、登録されたメッセージハンドラでは自動的に処理されないメッセージは処理できません。したがって、メッセージハンドラが添付されていないパターンやチャンネルを購読している場合、redis-pyはrun_in_thread()を呼び出さないようにします。

p.subscribe(**{'my-channel': my_handler}) 
thread = p.run_in_thread(sleep_time=0.001) 
# the event loop is now running in the background processing messages 
# when it's time to shut it down... 
thread.stop() 

ですから、メッセージが届いたかどうかを知りたいときだけGET_MESSAGEをチェックし、あなたの質問に答えます。

関連する問題