2016-09-04 6 views
0

私はカフカ0.10クラスターを持っています。新しい消費者グループIDが最初から始まっていないのはなぜですか?

私はKafkaConsumerと新しいグループIDのトピックを購読しても、レコードは返されませんが、同じグループIDで最初に検索しているConsumerRebalanceListenerでトピックを購読すると、レコードが取得されますトピックで

@Grab('org.apache.kafka:kafka-clients:0.10.0.0') 

import org.apache.kafka.clients.consumer.KafkaConsumer 
import org.apache.kafka.clients.consumer.ConsumerRecords 
import org.apache.kafka.clients.consumer.ConsumerRecord 
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener 
import org.apache.kafka.common.TopicPartition 
import org.apache.kafka.common.PartitionInfo 

Properties props = new Properties() 
props.with { 
    put("bootstrap.servers","***********:9091") 
    put("group.id","script-test-noseek") 
    put("enable.auto.commit","true") 
    put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") 
    put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") 
    put("session.timeout.ms",30000) 
} 

KafkaConsumer consumer = new KafkaConsumer(props) 
def topicMap = [:] 
consumer.listTopics().each { topic, partitioninfo -> 
     topicMap[topic] = 0 
} 

topicMap.each {topic, count -> 
    def stopTime = new Date().time + 30_000 
    def stop = false 

    println "Starting topic: $topic" 
    consumer.subscribe([topic]) 
    //consumer.subscribe([topic], new CRListener(consumer:consumer)) 
    while(!stop) { 
     ConsumerRecords<String, String> records = consumer.poll(5_000) 
     topicMap[topic] += records.size() 
     consumer.commitAsync() 
     if (new Date().time > stopTime || records.size() == 0) { 
      stop = true 
     } 
    }  
    consumer.unsubscribe() 
} 

def total = 0 
println "------------------- Results -----------------------" 
topicMap.each { k,v -> 
    if (v > 0) { 
     println "Topic: ${k.padRight(64,' ')} Records: ${v}" 
    } 
    total += v 
} 
println "===================================================" 
println "Total: ${total}" 
def dummy = "Process End" 

class CRListener implements ConsumerRebalanceListener { 
    KafkaConsumer consumer 
    void onPartitionsAssigned(java.util.Collection partitions) { 
     consumer.seekToBeginning(partitions) 
    } 
    void onPartitionsRevoked(java.util.Collection partitions) { 
     consumer.commitSync() 
    } 
} 

コードはGroovy 2.4.xです。そして私はブートストラップサーバーをマスクしました。 コンシューマ購読ラインのリスナーのコメントを外すと、リスナーは私の期待することを行います。しかし、それは私は結果を得ることはありません。

実行ごとにグループIDを変更すると、別の実行が中断されないようにピックアップしないと仮定します。

私は何が間違っているのか分かりません。どんな助けもありがとう。

答えて

2

新しいコンシューマグループIDを使用していて、最初からトピック全体を読みたい場合は、プロパティでパラメータ "auto.offset.reset = earliest"を指定する必要があります。消費者に

Properties props = new Properties() 
props.with { 
    // all other values... 
    put("auto.offset.reset","earliest") 
} 

起動以下の問題が発生した(デフォルト値は「最新」である):のための

  1. 見(有効)(有効)発見されたオフセットが使用group.id
  2. のオフセットをコミット
  3. なし(有効)が発見されたオフセットがない場合、
+0

auto.offset.commitに応じてオフセットを設定しますがありがとうございから、再開マティアス。私はそれをチェックします。 ConsumerRebalanceListenerのオプションがもっと簡単になるかもしれないことをお勧めします。私はそれを手動でコントロールできることを理解しています。しかし、seekLastCommitPosition()も価値があるかもしれません。 –

+0

それはMatthiasを完全に働かせました。ありがとうございました。 –

関連する問題