1
キューからすべてのメッセージを待ちたい&消費者を停止します。しかし、それは新しいメッセージを待っています。キューが空のときだけメッセージを待つ引数なし既存のメッセージをすべてキューから取得するには
public class Listener {
private Channel channel;
public Listener(boolean futureActionsAllowed) {
this.futureActionsAllowed = futureActionsAllowed;
channel = RabbitMQ.getChannel(TasksStatusOpts.QUEUE_NAME, TasksStatusOpts.DURABLE, TasksStatusOpts.EXCLUSIVE,
TasksStatusOpts.AUTO_DELETE);
}
public void getAndExit() throws IOException, InterruptedException {
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
String consumerTag = channel.basicConsume(TasksStatusOpts.QUEUE_NAME, TasksStatusOpts.AUTO_ACK,
queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
if (delivery == null) {
channel.basicCancel(consumerTag);
Connection conn = channel.getConnection();
channel.close();
conn.close();
break;
} else {
String message = new String(delivery.getBody(), Charsets.UTF_8);
handle(message, delivery.getEnvelope());
}
}
}
public static void main(String[] args) throws IOException {
new Listener(false).getAndExit();
}
}
現在のキューカウントを取得するAPI呼び出しがあるかどうかを確認できます。その場合は、アプリケーションを起動するときに、キューの数を読み取り、ループを何回実行してからアプリケーションを停止することができます。またはメッセージ配信時間を取得するAPIがある場合は、アプリケーション開始時間後に配信されるメッセージの取得を開始するときにアプリケーションを停止することができます –
キューからすべてのメッセージを消費して終了する必要がある場合は、キュー内のメッセージの数に依存します。消費者が生産よりも前に開始した場合(わずかな時間でさえ)、この方法は失敗します。プロデューサがそれらの間に少しの遅延を持つメッセージをパブリッシュすると、コンシューマはそれらを十分に読み込みますが、プロデューサがまだ動作しているのにキューは空の状態になります。したがって、メッセージ数ではなくタイムアウトを使用してください。 –