RabbitMQ rpcを設定しようとしています。 1つのキューにリッスンしたい場合は、メッセージを受信したときにreply_toヘッダーで複数のメッセージで指定されている匿名キューに返信します。EventMachine EM :: Iteratorがrabbitmq RPCでブロックされています
私は以下のトールタスクがキューを作成し、EMを使用してい:イテレータはバックreplyt_toがキールーティングで指定されたキューへのメッセージの数を送信するために:
desc "start_consumer", "start the test consumer"
def start_consumer
conf = {
:host => "localhost",
:user => "guest",
:password => "guest",
:vhost => "/",
:logging => true,
:port => 5672
}
# n = 1
AMQP.start(conf) do |connection|
channel = AMQP::Channel.new(connection)
requests_queue = channel.queue("one")
requests_queue.purge
Signal.trap("INT") do
connection.close do
EM.stop{exit}
end
end
channel.prefetch(1)
requests_queue.subscribe(:ack => true) do |header, body|
url_search = MultiJson.decode(body)
EM::Iterator.new(0..5).each do |n, iter|
lead = get_lead(n, (n == 5))
puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}"
AMQP::Exchange.default.publish(
MultiJson.encode(lead),
:immediate => true,
:routing_key => header.reply_to,
:correlation_id => header.correlation_id
)
iter.next
end
end
puts " [x] Awaiting RPC requests"
end
end
コードbeloowがにメッセージを送信上記で指定した待ち行列を作成し、EM :: Iteratorコードによって送信されたメッセージをリスンするために使用される待ち行列を作成します。このキューの名前は、最初のキューのreply_toヘッダーのルーティングキーです。
def publish(urlSearch, routing_key)
EM.run do
corr_id = rand(10_000_000).to_s
requests ||= Hash.new
connection = AMQP.connect(:host => "localhost")
callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false)
callback_queue.subscribe do |header, body|
lead = safe_json_decode(body)
puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"
if lead["is_last"]
puts "in exit"
connection.close do
EM.stop{exit}
end
end
end
callback_queue.append_callback(:declare) do
AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
end
puts "initial message sent"
end
end
上記のコードは、1つの面倒な例外が欲しいときに機能します。何かがEM :: Iteratorコードが非同期に実行されるのをブロックしています。メッセージが送信されたことをEM :: Iteratorコードが完了した後だけです。私は、メッセージが非同期に送信され、各反復後に匿名キューによって処理されるようにします。現時点では、EM :: Iteratorコードが最後の反復を完了した後で、すべてのメッセージが送信されます。
私が間違っていることを誰かが見たり、別のアプローチを提案することはできますか?私はEM :: deferを試して、同じ動作をしました。新しいスレッドをスピンアップ
スレッドを生成する必要がなく、db接続を開始するなどの理由で、EM.deferがより適している可能性があります。 – bbozo