2017-11-15 4 views
0

私はkafka-spring-testバージョン2.0.1.RELEASE(最新)のEmbeddedKafkaを使用しています。 私は、1つのテストだけを実行しているときに正しく動作する非常に簡単なテストを行っています。 しかし、私はそれらを次々と実行しているので、2番目のテストが失敗します。 - 消費者doestはメッセージを受け取りません。埋め込みカフカ以降のテストで遅れてコンシューマにメッセージを送信する

public class KafkaControllerTest { 

    private static final String FOO_TOPIC = "fooTopic"; 

    @Rule 
    public KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, FOO_TOPIC); 

    MessageConsumer msgConsumerFoo = mock(MessageConsumer.class); 

    @Before 
    public void before() { 
     assertTrue(embeddedKafka.getBrokerAddresses().length == 1); 
     KafkaController controller = new KafkaController(
       embeddedKafka.getBrokerAddress(0).toString(), 
       new Consumer(FOO_TOPIC, msgConsumerFoo)); 
     MockMvcBuilders.standaloneSetup(controller).build(); 
    } 

    @Test 
    public void kafkaFirstTest() throws Exception { 
     sendMessage(FOO_TOPIC, "foo message"); 

     verify(msgConsumerFoo).consume(any()); 
    } 

    @Test 
    public void kafkaSecondTest() throws Exception { 
     sendMessage(FOO_TOPIC, "foo2 message"); 

     verify(msgConsumerFoo).consume(any()); 
    } 

    void sendMessage(String topic, String notification) throws ExecutionException, InterruptedException { 
     Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka); 
     Producer<Integer, String> producer = new DefaultKafkaProducerFactory<Integer, String>(props).createProducer(); 
     producer.send(new ProducerRecord<>(topic, notification)).get(); 
    } 
} 

、テストクラスのコード:

public class KafkaController { 

    public KafkaController(String brokerAddress, Consumer... consumers) { 
     for (Consumer consumer : consumers) { 
      addTopicListener(brokerAddress, consumer.topic, consumer.messageConsumer); 
     } 
    } 

    private void addTopicListener(String brokerAddress, String topic, MessageConsumer consumer) { 
     HashMap<String, Object> consumerConfig = new HashMap<>(); 
     consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress); 
     consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); 
     DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
       consumerConfig, 
       new StringDeserializer(), 
       new StringDeserializer()); 

     ContainerProperties containerProperties = new ContainerProperties(topic); 
     containerProperties.setMessageListener((MessageListener<String, String>) data -> { 
      consumer.consume(data.value()); 
     }); 
     ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(cf, containerProperties); 
     container.start(); 
    } 

    public interface MessageConsumer { 
     void consume(String message); 
    } 

    public static class Consumer { 
     private final String topic; 
     private final MessageConsumer messageConsumer; 

     public Consumer(String topic, MessageConsumer messageConsumer) { 
      this.topic = topic; 
      this.messageConsumer = messageConsumer; 
     } 
    } 
} 

私はこの問題は、正しくローカルカフカのインスタンスの作業でテストし、消費者としてKafkaEmbeddedによって引き起こされると考えています。

紛失しているものがありますか?私は有用な消費財産を見つけられませんでした。

Kafka-spring-test docは言う:

一般的にテスト間ブローカーの起動/停止を避ける(と各テストのために別のトピックを使用)する@ClassRuleとしてルールを使用することをお勧めします。

しかし、私は両方の@ClassRuleとしてKafkaEmbeddedを作り、テストと、まだ何もで別のトピックを使用してみました。

@Test 
public void kafkaSecondTest() throws Exception { 
    Thread.sleep(1000); 
    sendMessage(FOO_TOPIC, "foo2 message"); 
    Thread.sleep(1000); 

    verify(msgConsumerFoo).consume(any()); 
} 

はい、何とかそれはThread.sleep(1000)を前にして、メッセージを送信した後に、両方追加されている場合にのみ機能します。第二の試験の遅延を追加することができますので、

この問題は、非同期とは何かを持っています。

したがって、KafkaEmbeddedまたは他のコンポーネントがメッセージを送信/使用する準備ができているかどうかを確認するにはどうすればよいですか?

答えて

1

おそらくauto.offset.reset=earliestConsumerConfig.AUTO_OFFSET_RESET_CONFIG)を設定する必要があります。それ以外の場合は、コンテナが完全に開始する前にメッセージを送信することがあります。

もちろん、同じトピックを使用する場合、2番目のコンシューマは2つのメッセージを取得する必要があります。

また、各テストの最後にコンテナを止めることをお勧めします。デバッグロギングも役立ちます。

EDIT

代替試験でメッセージを送信する前に、ContainerTestUtils.waitForAssignment()を使用することです。私たちはフレームワークテストでそれを多くしています。

+0

ありがとうございます。 コンテナを停止する - 良い点、私はそれについて考えなかった。残念ながら、それは問題を解決しません。 しかし、 "最も早い"オフセットを設定することは部分的に機能します。つまり、2番目のコンシューマが2つのメッセージを受け取ると言います。つまり、テストは互いに影響しあいます。私はKafkaEmbeddedを@ClassRuleではなく@Ruleにしておくことでそれを避けることができます。全体的に、それは良い提案です。 'auto.offset.reset = latest'を設定したい場合はどうすればいいですか?私は 'auto.offset.reset'プロパティの値を注入する必要がありますか、それとももっと良い解決策がありますか? – michalbrz

+0

代わりに、 'ContainerTestUtils 'を使うこともできます。waitForAssignment() ';私たちはフレームワークテストでそれを多くしています。 –

関連する問題