2017-02-27 14 views
0

私はrabbitMQキューからすべてのメッセージを取得しようとしています。rabbitMQからすべてのメッセージを取得

const messages = await rabbit.getMessages(outputQueue, false); 

これは、getMessagesメソッドの実現です。問題は、メッセージを3〜5回しか処理せず、 'resolve'を呼び出すことです。しばらくして、残りのメッセージを処理しますが、「解決」はすでに呼び出されており、再度実行することはできません。

const amqp = require('amqplib'); 
. 
. 
let amqpUrl; 
let queueConf; 

const init = (connection, queue) => { 
    amqpUrl =`amqp://${connection.user}:${connection.password}@${connection.host}:${connectio n.port}`; 
    if (connection.vhost) { 
amqpUrl = `amqp://${connection.user}:${connection.password}@${connection.host}:${connection.port}/${connection.vhost}`; 
    } 
    queueConf = queue; 
} 

const getChannel =() => new Promise((resolve) => { 
    amqp.connect(amqpUrl).then((conn) => { 
    conn.createChannel().then((ch) => { 
     ch.prefetch(1000).then(() => resolve(ch)) 
    }) 
    }) 
}) 

module.exports = (connection, queue) => { 
    init(connection, queue); 
    return { 
    getMessages: (queueName, cleanQueue) => new Promise((resolve) => { 
     let messages = []; 
     let i = 1; 
     getChannel().then((ch) => { 
     ch.consume(queueName, (msg) => { 
      messages.push(msg); 
      console.log(msg.content.toString()) 
     }, { noAck: cleanQueue }).then(() => { 
      logger.info(`Retreived ${messages.length} messages from ${queueName}`); 
      resolve(messages) 
     }) 
     }) 
    }) 
    . 
    . 
    }; 
    }; 

ありがとうございます!

+1

証拠:基本的に、あなたはchannel.get()ではなく、メッセージオブジェクトのfalseで解決されるまで、一度に一つのメッセージを得続けます –

答えて

1

このようにすることはできますが、メッセージが消費されるよりも速くキューにメッセージが追加された場合は非常に遅くなり、決して解決されません。 - と約束は間違いなくこの種のもののために適合しない約束はすべてのための解決策ではないことを

getMessages: (queueName, cleanQueue) => { 
    let messages = [] 
    let i = 1 
    return getChannel().then(function getMessage (ch) { 
    return ch.get(queueName, { noAck: cleanQueue }).then((msg) => { 
     if (msg) { 
     messages.push(msg) 
     return getMessage(ch) 
     } else { 
     logger.info(`Retrieved ${messages.length} messages from ${queueName}`) 
     return messages 
     } 
    }) 
    }).catch((err) => { 
    err.consumedMessages = messages 
    return Promise.reject(err) 
    }) 
} 
関連する問題