2016-08-23 7 views
1

spring-kafkaのドキュメントに基づいて、アノテーションベースの@KafkaListenerを使用してコンシューマを構成しています。私は起動時に、オフセットをゼロに指定しない限り、カフカの消費者は今後のメッセージではなく、既存のものをピックアップしSpring Kafkaすべてのトピックをリッスンし、パーティションオフセットを調整する

  1. -

    は、私が見ることはということです。 (私が望むものにオフセットを指定していないため、これは期待された結果だと理解しています)

  2. トピック+パーティションの組み合わせを指定するオプションがあります。私はこれを行う - 私は明示的に私の消費者が耳を傾けるトピックを指定する必要があります。上記の方法2を使用して

が、これは私の消費者が今どのように見えるかである -

@KafkaListener(id = "{group.id}", 
     topicPartitions = { 
       @TopicPartition(topic = "${kafka.topic.name}", 
         partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")) 
     }, 
     containerFactory = "kafkaListenerContainerFactory") 
public void listen(@Payload String payload, 
        Acknowledgment ack) throws InterruptedException, IOException { 

    logger.debug("This is what we received in the Kafka Consumer = " + payload); 

    idService.process(payload); 

    ack.acknowledge(); 
} 

私は「topicPattern」ワイルドカードまたは「トピック」リストなどを指定するオプションがあることを理解してアノテーション設定の一部ですが、リストされたトピック/トピックパターンのゼロから始まるオフセット値を提供できる場所が表示されません。両方の組み合わせを行う方法はありますか?お知らせ下さい。

答えて

3

トピックとtopicPatternを使用すると(明示的にパーティションを宣言するのではなく)、どのコンシューマインスタンスがどのパーティションを取得するかを決定します。

Kafkaはパーティションを割り当て、初期オフセットはそのグループIDに対して最後にコミットされます。現在、オフセットを変更することはできませんが、seek functionを追加することを検討しています。

あなたは常に最初に利用可能でスタートオフセット、ユニークなグループID(例えばUUID.randomUUID().toString())を使用する場合とカフカは既存それはそのプロパティを使用します。そのグループIDのオフセットがなかったので

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

を設定開始する場所を決定します。

MANUAL ACKモードを使用することもできますが、決してackを使用することはできません。これは効果的に同じことを行います。

+0

topicPatternはワイルドカードで動作するように設定されていますか?私がtopicPattern = "test *"を提供する場合、私の消費者はtest1、test2、test3のトピックを聞いていますか? – Satya

+1

パターンは正規表現(正規表現)です - 'test。*'を使います。 - 'test *'は 'tes'、' test'、 'testt' - ' tes'の後に0個以上の 't'sが続くことを意味します。 –

関連する問題