2016-07-15 8 views
1

私は、次のパッケージに含まれているバージョン0.9のためのカフカの消費者のAPIでOffsetCommitRequestを使用しようとしています: org.apache.kafka.common.requests.OffsetCommitRequestKafkaの消費者にOffsetCommitRequestを送信するには?

このリクエストを送信するためにどのように?これを使用する理想的な方法は何ですか? カフカ自体でオフセットをコミットしたいと思います。私は、バージョン0.9に関連するドキュメントは見つかりませんでした。そのほとんどは0.8.xで利用可能です

また、このリクエストのコンストラクタには、世代ID、メンバーID、および保持時間が必要です。これらのフィールドは何ですか?

+0

Goは、このリンクを介してhttps://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka – Aby

+0

私はすでにやりました。これは古いバージョン用です。 BlockingChannelを使用して0.9で利用できないリクエストを送信します – vamosromil

+0

バージョン0.9のKafkaドキュメントがここにあります:http://kafka.apache.org/090/documentation.html – Aby

答えて

3

手動でオフセットをコミットしたい場合は、多分あなたは

enable.auto.commit=false

とcommitSyncを(使用、消費者のプロパティを設定する)またはカフカの消費者のcommitAsync()メソッド必要があります。 たとえば、すべてのConsumerRecordが処理された後にcommitSync()を呼び出すことができます。 また、各ConsumerRecordを受け取った後でも、必要なTopicPartitionに対してのみコミットできます。このように:

Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(); 
offsetMap.put(new TopicPartition(someTopic, somePartition), new OffsetAndMetadata(someOffset)); 
kafkaConsumer.commitSync(offsetMap); 
+0

これは私が頼りにした最終的な解決策です。とにかくありがとう:) しかし、私はまだ私の元の質問への答えを持っていると思います。私はこのリクエストオブジェクトが時にはとても便利だと思います。 – vamosromil

関連する問題