カフカアプリケーションでユニットテストを実行する必要があります。ユニットテストのためのkafkaトピックをクリアする
今私の問題は、テストの間にすべてのトピックをクリアしたいのですが、私はどのようにわからないのですか。
これは私の一時的な解決策です。テストごとに生成されたすべてのメッセージをコミットし、すべてのテストコンシューマを同じコンシューマグループに配置します。
override protected def afterEach():Unit={
val cleanerConsumer= newConsumer(Seq.empty)
val topics=cleanerConsumer.listTopics()
println("pulisco")
cleanerConsumer.subscribe(topics.keySet())
cleanerConsumer.poll(100)
cleanerConsumer.commitSync()
cleanerConsumer.close()
}
これは機能しません。理由はわかりません。
たとえば、テスト内に新しいコンシューマを作成すると、messages
には前のテストで生成されたメッセージが含まれます。
val consumerProbe = newConsumer(SMSGatewayTopic)
val messages = consumerProbe.poll(1000)
どうすればこの問題を解決できますか?
カフカは、永続的なメッセージストアで、それは消費者が消費を開始するためのオフセットを決定できます。あなたはあなたが落ちた最後のオフセットを覚えておいて、その後にそれを消費し始めるだけです。 –
たとえば、次のように埋め込みブローカーを使用してください。https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java –
なぜこの質問はJavaでタグ付けされていますか?結果として、それは非常にJava固有の検索で現れます。それほど役に立たない。 – user1283068