3ノードカフカクラスタのセットアップがあります。私はカフカからのメッセージを読むために嵐を使用しています。私のシステムの各トピックには7つのパーティションがあります。カフカのログにオフセットがありません - シンプルコンシューマーが処理できません
今、私は奇妙な問題に直面しています。 3日前まで、すべてうまくいっていた。しかし、今や私の嵐のトポロジは#1と#4の2つのパーティションからは特に読めません。
私が問題にドリルダウンしようとした私のカフカのログに、これらのパーティションの両方のために、オフセット1は5964511後すなわち不足していることが判明し、次のオフセットが原因オフセット欠落に5964513なく5964512.
です、シンプルコンシューマーは次のオフセットに進むことができません。私は何か間違っているのですか、それとも既知のバグですか?
このような動作の理由は何でしょうか?
私は、有効なオフセットの窓を読み取るために、次のコードを使用しています:
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
OffsetRequest request = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] validOffsets = response.offsets(topic, partition);
for (long validOffset : validOffsets) {
System.out.println(validOffset + " : ");
}
long largestOffset = validOffsets[0];
long smallestOffset = validOffsets[validOffsets.length - 1];
System.out.println(smallestOffset + " : " + largestOffset);
return largestOffset;
}
をこれは私に次のような出力が得られます。
は4529948 : 6000878
だから、私は提供していますオフセットは、オフセット範囲内です。