2012-05-09 2 views
2

私はRabbitMQで既に.NETで作業していますが、問題はそれほどありません。今ではnode.jsでrabbit.jsに移動していますが、私はあまりよく知らないです。 rabbit.jsには限られた文書しかありません。私が知っているのは基本的なPUSH/PULLまたはPUB/SUBだけです。今ではREQ/REPをやりたいと思っていました。誰もが少しスニペットを共有することができます。Rabbit.jsでREP/REQを作成するには

ご返信いただければ幸いです。

ベストは、

答えて

14

これはあなたが尋ねた、おそらくよりですが、私が代わりにrabbit.jsとREQ/RESのnode-amqpを使用してRPCを行うため(それはかなり長いですが)snippletを持っています。私が行ったことは、RPCについてRabbitMQ tutorialで見つけたものに似ています

メッセージの内容は、amqpモジュールによってjsonに変換されるオブジェクト(ハッシュ)でなければなりません。

AmqpRpcクラスは、初期化時にamqp接続を受け取り、makeRequestを呼び出してコールバックで応答を待つだけでよいはずです。 応答はごめんなさい機能(ERR、応答)errがタイムアウトエラー

かもしれないの形がそのあなたが求めていない、まさにそれは十分に多分近いです持っています。 また、私はgithubの上の要点としてコードを掲載:https://gist.github.com/2720846

編集: サンプルは、複数の未処理の要求をサポートするように変更。

amqprpc.js

var amqp = require('amqp') 
    , crypto = require('crypto') 

var TIMEOUT=2000; //time to wait for response in ms 
var CONTENT_TYPE='application/json'; 

exports = module.exports = AmqpRpc; 

function AmqpRpc(connection){ 
    var self = this; 
    this.connection = typeof(connection) != 'undefined' ? connection : amqp.createConnection(); 
    this.requests = {}; //hash to store request in wait for response 
    this.response_queue = false; //plaseholder for the future queue 
} 

AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){ 
    var self = this; 
    //generate a unique correlation id for this call 
    var correlationId = crypto.randomBytes(16).toString('hex'); 

    //create a timeout for what should happen if we don't get a response 
    var tId = setTimeout(function(corr_id){ 
    //if this ever gets called we didn't get a response in a 
    //timely fashion 
    callback(new Error("timeout " + corr_id)); 
    //delete the entry from hash 
    delete self.requests[corr_id]; 
    }, TIMEOUT, correlationId); 

    //create a request entry to store in a hash 
    var entry = { 
    callback:callback, 
    timeout: tId //the id for the timeout so we can clear it 
    }; 

    //put the entry in the hash so we can match the response later 
    self.requests[correlationId]=entry; 

    //make sure we have a response queue 
    self.setupResponseQueue(function(){ 
    //put the request on a queue 
    self.connection.publish(queue_name, content, { 
     correlationId:correlationId, 
     contentType:CONTENT_TYPE, 
     replyTo:self.response_queue}); 
    }); 
} 


AmqpRpc.prototype.setupResponseQueue = function(next){ 
    //don't mess around if we have a queue 
    if(this.response_queue) return next(); 

    var self = this; 
    //create the queue 
    self.connection.queue('', {exclusive:true}, function(q){ 
    //store the name 
    self.response_queue = q.name; 
    //subscribe to messages 
    q.subscribe(function(message, headers, deliveryInfo, m){ 
     //get the correlationId 
     var correlationId = m.correlationId; 
     //is it a response to a pending request 
     if(correlationId in self.requests){ 
     //retreive the request entry 
     var entry = self.requests[correlationId]; 
     //make sure we don't timeout by clearing it 
     clearTimeout(entry.timeout); 
     //delete the entry from hash 
     delete self.requests[correlationId]; 
     //callback, no err 
     entry.callback(null, message); 
     } 
    }); 
    return next();  
    }); 
} 

それは以下の見つけることができます使用する方法の小さな例。両方のコードパーツを保存しておきます...

返信するサーバーがない場合は、要求がタイムアウトします。

client.js

//exmaple on how to use amqprpc 
var amqp = require('amqp'); 
var connection = amqp.createConnection({host:'127.0.0.1'}); 

var rpc = new (require('./amqprpc'))(connection); 

connection.on("ready", function(){ 
    console.log("ready"); 
    var outstanding=0; //counter of outstanding requests 

    //do a number of requests 
    for(var i=1; i<=10 ;i+=1){ 
    //we are about to make a request, increase counter 
    outstanding += 1; 
    rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){ 
     if(err) 
     console.error(err); 
     else 
     console.log("response", response); 
     //reduce for each timeout or response 
     outstanding-=1; 
     isAllDone(); 
    }); 
    } 

    function isAllDone() { 
    //if no more outstanding then close connection 
    if(outstanding === 0){ 
     connection.end(); 
    } 
    } 

}); 
私も良い測定のためのサンプルサーバに投げるよ

server.js

//super simple rpc server example 
var amqp = require('amqp') 
    , util = require('util'); 

var cnn = amqp.createConnection({host:'127.0.0.1'}); 

cnn.on('ready', function(){ 
    console.log("listening on msg_queue"); 
    cnn.queue('msg_queue', function(q){ 
     q.subscribe(function(message, headers, deliveryInfo, m){ 
     util.log(util.format(deliveryInfo.routingKey, message)); 
     //return index sent 
     cnn.publish(m.replyTo, {response:"OK", index:message.index}, { 
      contentType:'application/json', 
      contentEncoding:'utf-8', 
      correlationId:m.correlationId 
      }); 
     }); 
    }); 
}); 
+0

WOW!私は感銘を受けて。私はこの問題で私を助けるstackoverflowの誰もここにないだろうと思った。私は彼らの図書館にハッキングすることで回避することができました。しかし、本当にありがとう、これは私が探しているものです。 –

+0

ようこそ。 rabbit.jsではないのに、援助できるようになりました。しかし、警告のビット、これは私のアプリから抽出されたと私は余分な毛羽をすべて取り除くときにそれを台無しにしている可能性があります。しかし、コードは少なくとも私がやった基本的なテストで働いた。 – kmpm

+0

警告に感謝しています。私はそれをテストし、それは働いた!それが大丈夫なら、途中でコードを変更するかもしれません。しかし、もう一度、ありがとう。非常に役立ちます。私は他の人もそれが役に立つと思うことを望む。 –

関連する問題