2012-01-06 4 views
0

RabbitMQ rpcを設定しようとしています。 1つのキューにリッスンしたい場合は、メッセージを受信したときにreply_toヘッダーで複数のメッセージで指定されている匿名キューに返信します。EventMachine EM :: Iteratorがrabbitmq RPCでブロックされています

私は以下のトールタスクがキューを作成し、EMを使用してい:イテレータはバックreplyt_toがキールーティングで指定されたキューへのメッセージの数を送信するために:

desc "start_consumer", "start the test consumer" 
def start_consumer 
    conf = { 
    :host => "localhost", 
    :user => "guest", 
    :password => "guest", 
    :vhost => "/", 
    :logging => true, 
    :port => 5672 
    } 

    # n = 1 

    AMQP.start(conf) do |connection| 

    channel = AMQP::Channel.new(connection) 

    requests_queue = channel.queue("one") 
    requests_queue.purge 

    Signal.trap("INT") do 
     connection.close do 
     EM.stop{exit} 
     end 
    end 

    channel.prefetch(1) 

    requests_queue.subscribe(:ack => true) do |header, body| 
     url_search = MultiJson.decode(body) 

     EM::Iterator.new(0..5).each do |n, iter| 
     lead = get_lead(n, (n == 5)) 

     puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}" 

     AMQP::Exchange.default.publish(
             MultiJson.encode(lead), 
             :immediate => true, 
             :routing_key => header.reply_to, 
             :correlation_id => header.correlation_id 
            ) 

     iter.next 
     end 
    end 

    puts " [x] Awaiting RPC requests" 
    end   
end 

コードbeloowがにメッセージを送信上記で指定した待ち行列を作成し、EM :: Iteratorコードによって送信されたメッセージをリスンするために使用される待ち行列を作成します。このキューの名前は、最初のキューのreply_toヘッダーのルーティングキーです。

def publish(urlSearch, routing_key) 
    EM.run do 
    corr_id = rand(10_000_000).to_s 

    requests ||= Hash.new 

    connection = AMQP.connect(:host => "localhost") 

    callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false) 

    callback_queue.subscribe do |header, body| 
     lead = safe_json_decode(body) 

     puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"    

     if lead["is_last"] 
     puts "in exit" 
     connection.close do 
      EM.stop{exit} 
     end 
     end 
    end 

    callback_queue.append_callback(:declare) do 
     AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id) 
    end 

    puts "initial message sent" 
    end 
end 

上記のコードは、1つの面倒な例外が欲しいときに機能します。何かがEM :: Iteratorコードが非同期に実行されるのをブロックしています。メッセージが送信されたことをEM :: Iteratorコードが完了した後だけです。私は、メッセージが非同期に送信され、各反復後に匿名キューによって処理されるようにします。現時点では、EM :: Iteratorコードが最後の反復を完了した後で、すべてのメッセージが送信されます。

私が間違っていることを誰かが見たり、別のアプローチを提案することはできますか?私はEM :: deferを試して、同じ動作をしました。新しいスレッドをスピンアップ

答えて

0

は私の問題への答えだった:

Thread.new do 
    5.times do 
    lead = get_lead(n, (n == 5)) 

    puts "message #{n} is_last = #{lead.is_last} at #{Time.now}"; 

    AMQP::Exchange.default.publish(
            MultiJson.encode(lead), 
            :routing_key => header.reply_to, 
            :correlation_id => header.correlation_id 
           ) 

    n += 1 
    sleep(2) 
    end 
end 

新しいスレッドを作成EventMachine反応器が遮断されると、メッセージが非同期に送信され停止します。

+0

スレッドを生成する必要がなく、db接続を開始するなどの理由で、EM.deferがより適している可能性があります。 – bbozo