私は特定のオフセットを求めることができます。特定のオフセットで消費者を止める方法はありますか?言い換えれば、私の与えられたオフセットまで消費する。私が知る限り、カフカはそのような機能を提供していません。私が間違っていれば私を修正してください。カフカ消費者を特定のオフセットで止める方法はありますか?
例:パーティションのオフセットは1〜10です。私は3-8から消費したいだけです。 8番目のメッセージを消費した後、プログラムは終了する必要があります。
私は特定のオフセットを求めることができます。特定のオフセットで消費者を止める方法はありますか?言い換えれば、私の与えられたオフセットまで消費する。私が知る限り、カフカはそのような機能を提供していません。私が間違っていれば私を修正してください。カフカ消費者を特定のオフセットで止める方法はありますか?
例:パーティションのオフセットは1〜10です。私は3-8から消費したいだけです。 8番目のメッセージを消費した後、プログラムは終了する必要があります。
はい、kafkaはこの機能を提供していませんが、コンシューマーコードでこれを実現できます。あなたはこれを制御するためにcommitSync()
を試してみることができます。
ます。public void commitSync(地図オフセット)
話題とパーティションの指定したリストの指定されたオフセットをコミットします。 これはKafkaへのオフセットをコミットします。このAPIを使用してコミットされたオフセットは、すべての再バランス後および最初のフェッチ時に使用されます。そのため、Kafka以外にオフセットを格納する必要がある場合は、このAPIを使用しないでください。コミットされたオフセットは、アプリケーションが消費する次のメッセージ、つまりlastProcessedMessageOffset + 1でなければなりません。
これは同期コミットであり、コミットが成功するか回復不能なエラーが発生するまでブロックされます。発呼者)。
このような何か:
while (goAhead) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
if (record.offset() > OFFSET_BOUND) {
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())));
goAhead = false;
break;
}
process(record);
}
}
あなたは上記のコードではfalseに "enable.auto.commit" に設定する必要があります。あなたのケースでは、OFFSET_BOUNDは8に設定することができます。コミットされたオフセットはあなたの例ではわずか9なので、次回は消費者がこの位置からフェッチします。
パーティションオフセットが連続している(つまり、ログが圧縮されていない)と仮定すると、コンシューマはコンシューマを構成することができます(max.poll.records
configを使用)。これはあなたが望むオフセットで停止することができます。