私はRabbitMQで既に.NETで作業していますが、問題はそれほどありません。今ではnode.jsでrabbit.jsに移動していますが、私はあまりよく知らないです。 rabbit.jsには限られた文書しかありません。私が知っているのは基本的なPUSH/PULLまたはPUB/SUBだけです。今ではREQ/REPをやりたいと思っていました。誰もが少しスニペットを共有することができます。Rabbit.jsでREP/REQを作成するには
ご返信いただければ幸いです。
ベストは、
私はRabbitMQで既に.NETで作業していますが、問題はそれほどありません。今ではnode.jsでrabbit.jsに移動していますが、私はあまりよく知らないです。 rabbit.jsには限られた文書しかありません。私が知っているのは基本的なPUSH/PULLまたはPUB/SUBだけです。今ではREQ/REPをやりたいと思っていました。誰もが少しスニペットを共有することができます。Rabbit.jsでREP/REQを作成するには
ご返信いただければ幸いです。
ベストは、
これはあなたが尋ねた、おそらくよりですが、私が代わりにrabbit.jsとREQ/RESのnode-amqpを使用してRPCを行うため(それはかなり長いですが)snippletを持っています。私が行ったことは、RPCについてRabbitMQ tutorialで見つけたものに似ています
メッセージの内容は、amqpモジュールによってjsonに変換されるオブジェクト(ハッシュ)でなければなりません。
AmqpRpcクラスは、初期化時にamqp接続を受け取り、makeRequestを呼び出してコールバックで応答を待つだけでよいはずです。 応答はごめんなさい機能(ERR、応答)errがタイムアウトエラー
かもしれないの形がそのあなたが求めていない、まさにそれは十分に多分近いです持っています。 また、私はgithubの上の要点としてコードを掲載:https://gist.github.com/2720846
編集: サンプルは、複数の未処理の要求をサポートするように変更。
amqprpc.js
var amqp = require('amqp')
, crypto = require('crypto')
var TIMEOUT=2000; //time to wait for response in ms
var CONTENT_TYPE='application/json';
exports = module.exports = AmqpRpc;
function AmqpRpc(connection){
var self = this;
this.connection = typeof(connection) != 'undefined' ? connection : amqp.createConnection();
this.requests = {}; //hash to store request in wait for response
this.response_queue = false; //plaseholder for the future queue
}
AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
var self = this;
//generate a unique correlation id for this call
var correlationId = crypto.randomBytes(16).toString('hex');
//create a timeout for what should happen if we don't get a response
var tId = setTimeout(function(corr_id){
//if this ever gets called we didn't get a response in a
//timely fashion
callback(new Error("timeout " + corr_id));
//delete the entry from hash
delete self.requests[corr_id];
}, TIMEOUT, correlationId);
//create a request entry to store in a hash
var entry = {
callback:callback,
timeout: tId //the id for the timeout so we can clear it
};
//put the entry in the hash so we can match the response later
self.requests[correlationId]=entry;
//make sure we have a response queue
self.setupResponseQueue(function(){
//put the request on a queue
self.connection.publish(queue_name, content, {
correlationId:correlationId,
contentType:CONTENT_TYPE,
replyTo:self.response_queue});
});
}
AmqpRpc.prototype.setupResponseQueue = function(next){
//don't mess around if we have a queue
if(this.response_queue) return next();
var self = this;
//create the queue
self.connection.queue('', {exclusive:true}, function(q){
//store the name
self.response_queue = q.name;
//subscribe to messages
q.subscribe(function(message, headers, deliveryInfo, m){
//get the correlationId
var correlationId = m.correlationId;
//is it a response to a pending request
if(correlationId in self.requests){
//retreive the request entry
var entry = self.requests[correlationId];
//make sure we don't timeout by clearing it
clearTimeout(entry.timeout);
//delete the entry from hash
delete self.requests[correlationId];
//callback, no err
entry.callback(null, message);
}
});
return next();
});
}
それは以下の見つけることができます使用する方法の小さな例。両方のコードパーツを保存しておきます...
返信するサーバーがない場合は、要求がタイムアウトします。
client.js
//exmaple on how to use amqprpc
var amqp = require('amqp');
var connection = amqp.createConnection({host:'127.0.0.1'});
var rpc = new (require('./amqprpc'))(connection);
connection.on("ready", function(){
console.log("ready");
var outstanding=0; //counter of outstanding requests
//do a number of requests
for(var i=1; i<=10 ;i+=1){
//we are about to make a request, increase counter
outstanding += 1;
rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){
if(err)
console.error(err);
else
console.log("response", response);
//reduce for each timeout or response
outstanding-=1;
isAllDone();
});
}
function isAllDone() {
//if no more outstanding then close connection
if(outstanding === 0){
connection.end();
}
}
});
私も良い測定のためのサンプルサーバに投げるよ
server.js
//super simple rpc server example
var amqp = require('amqp')
, util = require('util');
var cnn = amqp.createConnection({host:'127.0.0.1'});
cnn.on('ready', function(){
console.log("listening on msg_queue");
cnn.queue('msg_queue', function(q){
q.subscribe(function(message, headers, deliveryInfo, m){
util.log(util.format(deliveryInfo.routingKey, message));
//return index sent
cnn.publish(m.replyTo, {response:"OK", index:message.index}, {
contentType:'application/json',
contentEncoding:'utf-8',
correlationId:m.correlationId
});
});
});
});
https://github.com/demchenkoe/node-amqp-rpc
ノードを持つrabbitMQよりrpcを実行するのに適していることがわかりました。強くお勧めします。
WOW!私は感銘を受けて。私はこの問題で私を助けるstackoverflowの誰もここにないだろうと思った。私は彼らの図書館にハッキングすることで回避することができました。しかし、本当にありがとう、これは私が探しているものです。 –
ようこそ。 rabbit.jsではないのに、援助できるようになりました。しかし、警告のビット、これは私のアプリから抽出されたと私は余分な毛羽をすべて取り除くときにそれを台無しにしている可能性があります。しかし、コードは少なくとも私がやった基本的なテストで働いた。 – kmpm
警告に感謝しています。私はそれをテストし、それは働いた!それが大丈夫なら、途中でコードを変更するかもしれません。しかし、もう一度、ありがとう。非常に役立ちます。私は他の人もそれが役に立つと思うことを望む。 –