消費者/労働者がある場合は、を消費することを確認したいと思います。私はお送りしようとしています。PikaまたはRabbitMQでは、消費者が現在消費しているかどうかを確認するにはどうすればよいですか?
任意のワーカーがない場合、私はいくつかの労働者を開始(消費者や出版社の両方が単一のマシン上にある)、その後メッセージを公開して行くでしょう。
connection.check_if_has_consumers
のような機能があれば、私は多少このようにそれを実装するだろう -
import pika
import workers
# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
# start the workers in other processes, using python's `multiprocessing`
workers.start_workers()
# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
properties=pika.BasicProperties(delivery_mode=2))
connection.close()
をしかし、私はナキウサギでcheck_if_has_consumers
機能を持つ任意の関数を見つけることができません。
pikaを使用してこれを達成する方法はありますか?またはおそらく話すことによって〜うさぎで直接ですか?
私はちょうど私が完全にわからないが、私は実際にそれが彼らにメッセージをディスパッチないのでは、消費者の数を認識しているであろうのRabbitMQは、異なるキューに加入考えるとのACK
を受け入れますその任意のヘルプ場合は、ここで
は私が書いたworkers.pyコードは、ある...任意のヘルプは大歓迎です... 3時間前のRabbitMQで始まった....
import multiprocessing
import pika
def start_workers(num=3):
"""start workers as non-daemon processes"""
for i in xrange(num):
process = WorkerProcess()
process.start()
class WorkerProcess(multiprocessing.Process):
"""
worker process that waits infinitly for task msgs and calls
the `callback` whenever it gets a msg
"""
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_working = multiprocessing.Event()
def run(self):
"""
worker method, open a channel through a pika connection and
start consuming
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.queue_declare(queue='worker_queue', auto_delete=False,
durable=True)
# don't give work to one worker guy until he's finished
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='worker_queue')
# do what `channel.start_consuming()` does but with stopping signal
while len(channel._consumers) and not self.stop_working.is_set():
channel.transport.connection.process_data_events()
channel.stop_consuming()
connection.close()
return 0
def signal_exit(self):
"""exit when finished with current loop"""
self.stop_working.set()
def exit(self):
"""exit worker, blocks until worker is finished and dead"""
self.signal_exit()
while self.is_alive(): # checking `is_alive()` on zombies kills them
time.sleep(1)
def kill(self):
"""kill now! should not use this, might create problems"""
self.terminate()
self.join()
def callback(channel, method, properties, body):
"""pika basic consume callback"""
print 'GOT:', body
# do some heavy lifting here
result = save_to_database(body)
print 'DONE:', result
channel.basic_ack(delivery_tag=method.delivery_tag)
EDIT:より良いアプローチは、一緒に来ていない限り、私はので、ここで前方に移動する必要が
が、私は取るつもりな回避策で、
ので、RabbitMQのはこれらを持っていますHTTP management apisは、management pluginをオンにした後、HTTPのapisページの途中にあります。
/api/connections - 開いているすべての接続のリスト。
/api/connections/name - 個々の接続。それを削除すると接続が切断されます。私は私の労働者を接続して、私のが異なる接続名/ユーザーによっての両方を生成する場合ワーカー接続が開いている場合
ので、私はそこ(...チェックできるようになります労働者が死ぬと問題になるかもしれません...)
より良い解決策を待っています...
EDIT:
がちょうどRabbitMQのドキュメントでこれを見つけましたが、これはPythonで行うのはハックのようになります。
[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue 0
...done.
ので、私のような何かを行うことができ、
subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")
ハッキー...まだピカはこれを行うためのいくつかのpython関数を持っています...
ありがとう、