私はRabbitMQに少数のメッセージを送信しますが、コンシューマは2つしか消費しません。RabbitMQコンシューマは、送信されたメッセージの一部のみを消費します
@Component
public class Producer implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Consumer receiver;
private final ConfigurableApplicationContext context;
public Producer(Consumer receiver, RabbitTemplate rabbitTemplate,
ConfigurableApplicationContext context) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
this.context = context;
}
private List<Message> extractphoneNumberNumbers(String path){
List<Message> messages = new ArrayList();
try{
BufferedReader fileReader = new BufferedReader(
new InputStreamReader(new ClassPathResource(path).getInputStream()));
JsonReader reader = new JsonReader(fileReader);
Gson gson = new GsonBuilder().create();
messages = new Gson().fromJson(reader, new TypeToken<List<Message>>() {}.getType());
}catch(Exception e){
System.out.println(e.getMessage());
}
return messages;
}
public void run(String... args) throws Exception {
List<Message> messagesToPublish = extractphoneNumberNumbers("phoneNumbers");
AtomicInteger messageId = new AtomicInteger();
for(Message message : messagesToPublish){
System.out.println("Sent " + message.toString());
rabbitTemplate.convertAndSend(App.queueName, new Message(messageId.incrementAndGet(), message.getPhoneNumber()));
}
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
context.close();
}
}
@Component
public class Consumer {
private CountDownLatch latch = new CountDownLatch(1);
private HashMap<String, Integer> phoneCountryCode = new HashMap<String, Integer>();
private int i = 0;
public void receiveJsonMessages(Message message){
System.out.println("Received " + message.toString());
phoneCountryCode.put(message.getCountryCode(), phoneCountryCode.getOrDefault(message.getCountryCode(), 0) + 1);
for(Map.Entry<String, Integer> entry : phoneCountryCode.entrySet()){
System.out.println(entry.getKey() + " " + entry.getValue());
}
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
Received 5 - +40722579707 - RO
Sent 0 - +447984627687 - GB
RO 1
Received 6 - +40722579717 - RO
Sent 0 - +447984627657 - GB
RO 2
Sent 0 - +447984627667 - GB
Sent 0 - +447984627677 - GB
Sent 0 - +40722579707 - RO
Sent 0 - +40722579717 - RO
Sent 0 - +40722579727 - RO
目標はグループにあり、国によって電話番号を数える:私のプロデューサー&消費者のためのコードの下にして下さい。私はちょうどその時点でRabbitMQで遊んでいて、経験がないので、グループ化は...今はばかげている。
なぜ、7ではなく2つのメッセージしか受信できないのですか?
編集:私は、労働者(生産者)が完成すると豆が止まるという事実によると信じています。コンシューマーが終了した後、これをどのように停止するように設定できますか?
なぜあなたのプライベートCountDownLatchラッチ=新しいCountDownLatch(1)ですか?あなたが7つのメッセージを期待するとき、1ですか?ここのラッチの目的は何ですか? – Imran
私は、メッセージが受信されたことを知らせるために、各メッセージにCountDownLatchを使用しています。 –
消費者の数を生成されたメッセージの数と同じに設定すると、アプリケーションは正しい数を出力しますが、これは各消費者が1つのメッセージしか消費しないことを意味しているようです。どの消費者がすべてのメッセージを消費するように設定するには? –