kafka SimpleConsumerの自動コミットを無効にしたいと思います。私は0.8.1バージョンを使用しています。上位レベルのコンシューマの場合、configオプションは以下のように設定し、consumerConfigで渡すことができます。 kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig);SimpleConsumer kafkaの自動コミットを無効にする方法0.8.1
SimpleConsumerで同じようにするにはどうすればよいですか?主に自動コミットを無効にしたい私はconsumer.propertiesでfalseに自動コミットを設定しようとし、kafkaサーバー、zookeeperとproducerを再起動しました。しかし、それは動作しません。私はこの設定をconsumer.propertiesではなく、コードで適用する必要があると思います。 誰でも助けることができますか?ここで
オフセットコミットする前に、コードがクラッシュした上記の場合は私のコードは
List<TopicAndPartition> topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0, correlationId, clientName));
Map<TopicAndPartition, OffsetMetadataAndError> offsets = offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, offsets.get(topicAndPartition).offset(), 100000) .build();
long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);
//Consume messages from fetchResponse
Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new HashMap<> ();
requestInfo.put(topicAndPartition, new OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new OffsetCommitRequest("testGroup", requestInfo, (short)0, correlationId, clientName));
どのように見えるかですが、私はまだ最新の私を作る次回の実行中offsets.get(topicAndPartition).offset()の結果として、オフセットを取得しますコードが実行されるときにオフセットの自動コミットが発生すると考えること。
上記のコードスニペットを追加しました。オフセットをコミットする前にアプリケーションがクラッシュした場合でも、次回の実行時には最新のオフセットを取得します。だから、私は、自動コミットは、明示的に無効にされていないと内部的に起こると思います。 – Vikram
config 'auto.offset.reset'がデフォルトで最大に設定されているため、消費者はZookeeperに初期オフセットが保存されていない場合、オフセットを最大のオフセットにリセットします(バージョン0は常に動物園に保存されます) – amethystic
複数の実行コミットのオフセットが早く成功した場合、私は同じ動作をします。 commitOffsetsはzookeeperに書き込みませんか? – Vikram