spring-kafkaのドキュメントに基づいて、アノテーションベースの@KafkaListenerを使用してコンシューマを構成しています。私は起動時に、オフセットをゼロに指定しない限り、カフカの消費者は今後のメッセージではなく、既存のものをピックアップしSpring Kafkaすべてのトピックをリッスンし、パーティションオフセットを調整する
-
は、私が見ることはということです。 (私が望むものにオフセットを指定していないため、これは期待された結果だと理解しています)
トピック+パーティションの組み合わせを指定するオプションがあります。私はこれを行う - 私は明示的に私の消費者が耳を傾けるトピックを指定する必要があります。上記の方法2を使用して
が、これは私の消費者が今どのように見えるかである -
@KafkaListener(id = "{group.id}",
topicPartitions = {
@TopicPartition(topic = "${kafka.topic.name}",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
},
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
Acknowledgment ack) throws InterruptedException, IOException {
logger.debug("This is what we received in the Kafka Consumer = " + payload);
idService.process(payload);
ack.acknowledge();
}
私は「topicPattern」ワイルドカードまたは「トピック」リストなどを指定するオプションがあることを理解してアノテーション設定の一部ですが、リストされたトピック/トピックパターンのゼロから始まるオフセット値を提供できる場所が表示されません。両方の組み合わせを行う方法はありますか?お知らせ下さい。
topicPatternはワイルドカードで動作するように設定されていますか?私がtopicPattern = "test *"を提供する場合、私の消費者はtest1、test2、test3のトピックを聞いていますか? – Satya
パターンは正規表現(正規表現)です - 'test。*'を使います。 - 'test *'は 'tes'、' test'、 'testt' - ' tes'の後に0個以上の 't'sが続くことを意味します。 –