私はkafka-spring-testバージョン2.0.1.RELEASE(最新)のEmbeddedKafka
を使用しています。 私は、1つのテストだけを実行しているときに正しく動作する非常に簡単なテストを行っています。 しかし、私はそれらを次々と実行しているので、2番目のテストが失敗します。 - 消費者doestはメッセージを受け取りません。埋め込みカフカ以降のテストで遅れてコンシューマにメッセージを送信する
public class KafkaControllerTest {
private static final String FOO_TOPIC = "fooTopic";
@Rule
public KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, FOO_TOPIC);
MessageConsumer msgConsumerFoo = mock(MessageConsumer.class);
@Before
public void before() {
assertTrue(embeddedKafka.getBrokerAddresses().length == 1);
KafkaController controller = new KafkaController(
embeddedKafka.getBrokerAddress(0).toString(),
new Consumer(FOO_TOPIC, msgConsumerFoo));
MockMvcBuilders.standaloneSetup(controller).build();
}
@Test
public void kafkaFirstTest() throws Exception {
sendMessage(FOO_TOPIC, "foo message");
verify(msgConsumerFoo).consume(any());
}
@Test
public void kafkaSecondTest() throws Exception {
sendMessage(FOO_TOPIC, "foo2 message");
verify(msgConsumerFoo).consume(any());
}
void sendMessage(String topic, String notification) throws ExecutionException, InterruptedException {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
Producer<Integer, String> producer = new DefaultKafkaProducerFactory<Integer, String>(props).createProducer();
producer.send(new ProducerRecord<>(topic, notification)).get();
}
}
、テストクラスのコード:
public class KafkaController {
public KafkaController(String brokerAddress, Consumer... consumers) {
for (Consumer consumer : consumers) {
addTopicListener(brokerAddress, consumer.topic, consumer.messageConsumer);
}
}
private void addTopicListener(String brokerAddress, String topic, MessageConsumer consumer) {
HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerConfig,
new StringDeserializer(),
new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener((MessageListener<String, String>) data -> {
consumer.consume(data.value());
});
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(cf, containerProperties);
container.start();
}
public interface MessageConsumer {
void consume(String message);
}
public static class Consumer {
private final String topic;
private final MessageConsumer messageConsumer;
public Consumer(String topic, MessageConsumer messageConsumer) {
this.topic = topic;
this.messageConsumer = messageConsumer;
}
}
}
私はこの問題は、正しくローカルカフカのインスタンスの作業でテストし、消費者としてKafkaEmbedded
によって引き起こされると考えています。
紛失しているものがありますか?私は有用な消費財産を見つけられませんでした。
一般的にテスト間ブローカーの起動/停止を避ける(と各テストのために別のトピックを使用)する@ClassRuleとしてルールを使用することをお勧めします。
しかし、私は両方の@ClassRule
としてKafkaEmbedded
を作り、テストと、まだ何もで別のトピックを使用してみました。
@Test
public void kafkaSecondTest() throws Exception {
Thread.sleep(1000);
sendMessage(FOO_TOPIC, "foo2 message");
Thread.sleep(1000);
verify(msgConsumerFoo).consume(any());
}
はい、何とかそれはThread.sleep(1000)
を前にして、メッセージを送信した後に、両方追加されている場合にのみ機能します。第二の試験の遅延を追加することができますので、
この問題は、非同期とは何かを持っています。
したがって、KafkaEmbedded
または他のコンポーネントがメッセージを送信/使用する準備ができているかどうかを確認するにはどうすればよいですか?
ありがとうございます。 コンテナを停止する - 良い点、私はそれについて考えなかった。残念ながら、それは問題を解決しません。 しかし、 "最も早い"オフセットを設定することは部分的に機能します。つまり、2番目のコンシューマが2つのメッセージを受け取ると言います。つまり、テストは互いに影響しあいます。私はKafkaEmbeddedを@ClassRuleではなく@Ruleにしておくことでそれを避けることができます。全体的に、それは良い提案です。 'auto.offset.reset = latest'を設定したい場合はどうすればいいですか?私は 'auto.offset.reset'プロパティの値を注入する必要がありますか、それとももっと良い解決策がありますか? – michalbrz
代わりに、 'ContainerTestUtils 'を使うこともできます。waitForAssignment() ';私たちはフレームワークテストでそれを多くしています。 –