2017-08-20 10 views
0

私はRabbitMQに少数のメッセージを送信しますが、コンシューマは2つしか消費しません。RabbitMQコンシューマは、送信されたメッセージの一部のみを消費します

@Component 
public class Producer implements CommandLineRunner { 

private final RabbitTemplate rabbitTemplate; 
private final Consumer receiver; 
private final ConfigurableApplicationContext context; 

public Producer(Consumer receiver, RabbitTemplate rabbitTemplate, 
     ConfigurableApplicationContext context) { 
    this.receiver = receiver; 
    this.rabbitTemplate = rabbitTemplate; 
    this.context = context; 
} 

private List<Message> extractphoneNumberNumbers(String path){ 
    List<Message> messages = new ArrayList(); 
    try{ 
     BufferedReader fileReader = new BufferedReader(
        new InputStreamReader(new ClassPathResource(path).getInputStream())); 
     JsonReader reader = new JsonReader(fileReader); 
     Gson gson = new GsonBuilder().create(); 
     messages = new Gson().fromJson(reader, new TypeToken<List<Message>>() {}.getType()); 
    }catch(Exception e){ 
     System.out.println(e.getMessage()); 
    } 
    return messages; 
} 

public void run(String... args) throws Exception { 
    List<Message> messagesToPublish = extractphoneNumberNumbers("phoneNumbers"); 
    AtomicInteger messageId = new AtomicInteger(); 
    for(Message message : messagesToPublish){ 
     System.out.println("Sent " + message.toString()); 
     rabbitTemplate.convertAndSend(App.queueName, new Message(messageId.incrementAndGet(), message.getPhoneNumber())); 
    } 
    receiver.getLatch().await(10000, TimeUnit.MILLISECONDS); 

    context.close(); 
} 

} 


@Component 
public class Consumer { 
private CountDownLatch latch = new CountDownLatch(1); 
private HashMap<String, Integer> phoneCountryCode = new HashMap<String, Integer>(); 
private int i = 0; 

public void receiveJsonMessages(Message message){ 
    System.out.println("Received " + message.toString()); 
    phoneCountryCode.put(message.getCountryCode(), phoneCountryCode.getOrDefault(message.getCountryCode(), 0) + 1); 

    for(Map.Entry<String, Integer> entry : phoneCountryCode.entrySet()){ 
     System.out.println(entry.getKey() + " " + entry.getValue()); 
    } 

    latch.countDown(); 
} 

public CountDownLatch getLatch() { 
    return latch; 
} 
} 

Received 5 - +40722579707 - RO 
Sent 0 - +447984627687 - GB 
RO 1 
Received 6 - +40722579717 - RO 
Sent 0 - +447984627657 - GB 
RO 2 
Sent 0 - +447984627667 - GB 
Sent 0 - +447984627677 - GB 
Sent 0 - +40722579707 - RO 
Sent 0 - +40722579717 - RO 
Sent 0 - +40722579727 - RO 

目標はグループにあり、国によって電話番号を数える:私のプロデューサー&消費者のためのコードの下にして下さい。私はちょうどその時点でRabbitMQで遊んでいて、経験がないので、グループ化は...今はばかげている。

なぜ、7ではなく2つのメッセージしか受信できないのですか?

編集:私は、労働者(生産者)が完成すると豆が止まるという事実によると信じています。コンシューマーが終了した後、これをどのように停止するように設定できますか?

+0

なぜあなたのプライベートCountDownLatchラッチ=新しいCountDownLatch(1)ですか?あなたが7つのメッセージを期待するとき、1ですか?ここのラッチの目的は何ですか? – Imran

+0

私は、メッセージが受信されたことを知らせるために、各メッセージにCountDownLatchを使用しています。 –

+0

消費者の数を生成されたメッセージの数と同じに設定すると、アプリケーションは正しい数を出力しますが、これは各消費者が1つのメッセージしか消費しないことを意味しているようです。どの消費者がすべてのメッセージを消費するように設定するには? –

答えて

0

ここでは、https://www.rabbitmq.com/tutorials/tutorial-one-spring-amqp.htmlのサンプルに従います。重要な部分は

@RabbitHandler 
public void receive(String in) { 
    System.out.println(" [x] Received '" + in + "'"); 
} 

方法は、シーンの背後にあるループ内で実行され、それが利用可能な今までにときにメッセージを消費しています。上記のようなコンテキストを本当に閉じるには、消費されるメッセージの数を知り、必要な数のメッセージでCountDownLatchを設定する必要があります。これまでにメッセージを受信すると、カウントダウンしてメインスレッドのラッチを待つことができます。しかし、これはすべてのメッセージを受け取ることを保証するものではなく、ネットワーク障害など何かが起きる可能性があるという保証はありません。その後、ラッチ待機時にタイムアウトを設定することができます。

+0

キューに送信されたメッセージの数にコンシューマの数が一致しない限り、コンシューマは間違った数のメッセージを消費します。 –

+0

おそらく間違っています。 – Imran

+0

自分のアプリケーションコードも追加しました。あなたは一度見てみませんか? –

関連する問題