0
私はrabbitMQキューからすべてのメッセージを取得しようとしています。rabbitMQからすべてのメッセージを取得
const messages = await rabbit.getMessages(outputQueue, false);
これは、getMessagesメソッドの実現です。問題は、メッセージを3〜5回しか処理せず、 'resolve'を呼び出すことです。しばらくして、残りのメッセージを処理しますが、「解決」はすでに呼び出されており、再度実行することはできません。
const amqp = require('amqplib');
.
.
let amqpUrl;
let queueConf;
const init = (connection, queue) => {
amqpUrl =`amqp://${connection.user}:${connection.password}@${connection.host}:${connectio n.port}`;
if (connection.vhost) {
amqpUrl = `amqp://${connection.user}:${connection.password}@${connection.host}:${connection.port}/${connection.vhost}`;
}
queueConf = queue;
}
const getChannel =() => new Promise((resolve) => {
amqp.connect(amqpUrl).then((conn) => {
conn.createChannel().then((ch) => {
ch.prefetch(1000).then(() => resolve(ch))
})
})
})
module.exports = (connection, queue) => {
init(connection, queue);
return {
getMessages: (queueName, cleanQueue) => new Promise((resolve) => {
let messages = [];
let i = 1;
getChannel().then((ch) => {
ch.consume(queueName, (msg) => {
messages.push(msg);
console.log(msg.content.toString())
}, { noAck: cleanQueue }).then(() => {
logger.info(`Retreived ${messages.length} messages from ${queueName}`);
resolve(messages)
})
})
})
.
.
};
};
ありがとうございます!
証拠:基本的に、あなたは
channel.get()
ではなく、メッセージオブジェクトのfalse
で解決されるまで、一度に一つのメッセージを得続けます –