2013-09-30 18 views
5

3ノードカフカクラスタのセットアップがあります。私はカフカからのメッセージを読むために嵐を使用しています。私のシステムの各トピックには7つのパーティションがあります。カフカのログにオフセットがありません - シンプルコンシューマーが処理できません

今、私は奇妙な問題に直面しています。 3日前まで、すべてうまくいっていた。しかし、今や私の嵐のトポロジは#1と#4の2つのパーティションからは特に読めません。

私が問題にドリルダウンしようとした私のカフカのログに、これらのパーティションの両方のために、オフセット1は5964511後すなわち不足していることが判明し、次のオフセットが原因オフセット欠落に5964513なく5964512.

です、シンプルコンシューマーは次のオフセットに進むことができません。私は何か間違っているのですか、それとも既知のバグですか?

このような動作の理由は何でしょうか?

私は、有効なオフセットの窓を読み取るために、次のコードを使用しています:

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, 
           long whichTime, String clientName) { 
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); 
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
    requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100)); 
    OffsetRequest request = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName); 
    OffsetResponse response = consumer.getOffsetsBefore(request); 
    long[] validOffsets = response.offsets(topic, partition); 
    for (long validOffset : validOffsets) { 
     System.out.println(validOffset + " : "); 
    } 
    long largestOffset = validOffsets[0]; 
    long smallestOffset = validOffsets[validOffsets.length - 1]; 
    System.out.println(smallestOffset + " : " + largestOffset); 
    return largestOffset; 
} 

をこれは私に次のような出力が得られます。

4529948 : 6000878 

だから、私は提供していますオフセットは、オフセット範囲内です。

答えて

1

後半の答えのために申し訳ありませんが、次が読みオフセットを保持するためにロングインスタンスVARを有し、かつ、返さFetchResponse hasError(あればフェッチを確認するために後にチェックすることによって、このような場合のために...

Iコード)。エラーがあった場合は、次のオフセット値を妥当な値(次のオフセットまたは最後の使用可能なオフセット)に変更して再試行します。

関連する問題