2016-03-22 13 views
2

を追加しました。..カフカの消費者は、私はカフカサーバからのメッセージ(トピック)を消費するKafkaConsumerを使用してい

  • それは消費者のコードを開始する前に作成したトピックのため正常に動作します...

しかし、問題は動的に作成されたトピック(コンシューマーコードが開始された後のことを言いたい)がAPIに動的トピックの作成をサポートすると言われている場合は機能しません。

使用カフカ版:0.9.0.1ここで

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

は... JAVAコードである

Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "false"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    Pattern r = Pattern.compile("siddu(\\d)*"); 

    consumer.subscribe(r, new HandleRebalance()); 
    try { 
     while(true) { 
      ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
      for (TopicPartition partition : records.partitions()) { 
       List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
       for (ConsumerRecord<String, String> record : partitionRecords) { 
        System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value()); 
       } 
       long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 

       consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
      } 
     } 
    } finally { 
     consumer.close(); 
    } 

注:私のトピック名が.. を正規表現にマッチしていると私は消費者を再起動した場合、それがプッシュされたメッセージを読み始めますトピックに...

すべてのヘルプは本当にあなたが飼育係にフックすることができます...

答えて

2

を高く評価しています。 the sample codeをチェックしてください。本質的には、動物園のノード/brokers/topicsにウォッチャーを作成します。新しい子供がここに追加されると、新しいトピックが追加され、ウォッチャーがトリガーされます。

+0

urの返信と助けてくれてありがとう...基本的に私はこれを達成するためにKafkaConsumerのAPIを使いたいと思っていました。私はそれを自分で解決しました。 – siddu

+0

これはどのように解決されましたか?私も同じ問題があります。 – madlad

+0

@sidduこの問題をどのように解決したか教えていただけますか? – bhspencer

5

これに対する答えは、Apacheのカフカメールアーカイブにあります。

コンシューマは、トピックメタデータを取得する頻度を基本的に制御する設定オプション "metadata.max.age.ms" をサポートしています。 デフォルトでは、これはかなり高い(5分)と設定されています。つまり、通常の 式に一致する新しいトピックを見つけるまでには最大で になります。これを低く設定すると、トピックをすばやく見つけることができます。

そうすることができますあなたの小道具で:

props.put("metadata.max.age.ms", 5000); 

これは、あなたの消費者は新しい話題について5秒ごとを見つけることになります。

関連する問題