私は3つのパーティションを持つトピックに接続された単一のカフカコンシューマを持っています。カフカからレコードを取得するとすぐに、私はオフセットとパーティションをキャプチャしたいと思います。再起動時に私はカフカのドキュメントからカフカ同じオフセットから再起動
を相殺最後の読み取りから、消費者の位置を復元したいと思います:
各レコードは、独自に付属しているので、あなた自身が、あなただけの次の操作を行う必要がありオフセット管理するために、オフセット:
設定のenable.auto.commit = falseを
はオフセットあなた 位置を保存するために、各ConsumerRecordを提供を使用してください。
再起動時にseek (TopicPartition、long)を使用してコンシューマの位置を復元します。ここで
私のサンプルコードです:
constructor{
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}
はこれが行うには正しい方法は何ですか?もっと良い方法がありますか?
を使用することを検討してくださいお勧めしますが、その前に指摘したように、私は、手動でパーティションを処理していますはい、アウトハンドリング自動パーティショニングを指していただき、ありがとうございます、私しかいません1コンシューマが3つのパーティションすべてに接続しているので、そのことでうまくいくはずです –