2017-08-24 12 views
-1

私はApache camelでアプリケーションを書いています。私はいくつかのカフカのトピックからキャメルカフカのコンポーネントを介してメッセージを消費しており、クラッシュ/再起動が発生した場合の回復のためにデータベースにダンプします。以下はラクダであるURIカフカコンシューマのメッセージを回復する方法

カフカ:?autoCommitEnable = falseを&のgroupId = R & keySerializerClass = org.apache.kafka.common.serialization.StringSerializer & serializerClass = org.apache.kafka.common.serialization.StringSerializer &トピック=

私のユースケースがある - 私はカフカからの何らかのメッセージ(複数可)を消費しているが、回復のためにデータベースに同じことをダンプと同じで、すべての失われたメッセージを取得する方法をhappens.Nowをクラッシュことができませんでしたアプリケーションを再起動した後のコンシューマグループID? ありがとう

+0

ようこそスタックオーバーフロー。投稿のガイドラインをお読みください。あなたが試したことを示すサンプルコードを提供できれば助けになります。 – catbadger

答えて

0

アプリケーションを再起動した後、同じコンシューマグループIDのすべての失われたメッセージを取得する方法はありますか?

実際にkafkaは、あなたのアプリケーションでオフセットをコミットする場合、コンシューマオフセットを格納します。したがって、アプリケーションを再起動すると、カフカに保存されている最後のオフセットからのメッセージが消費されます。

あなたは、私はまた、このhttps://github.com/apache/camel/blob/camel-2.18.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.javaを見つけた真のAutocommitEnable = OR

を設定することができます。

いくつかの作品のコードがあります

if (endpoint.getConfiguration().isAutoCommitEnable() != null 
         && !endpoint.getConfiguration().isAutoCommitEnable()) { 
         long partitionLastoffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
         consumer.commitSync(Collections.singletonMap(
          partition, new OffsetAndMetadata(partitionLastoffset + 1))); 
        } 

キャメルも、あなたはAutocommitEnableを設定していないこのの世話をします。

+0

返信のためにGuangshengZuoに感謝します。しかし、ラクダアプリケーションからの消費をうまく使いこなすにはどうすればいいですか? – Rajat

+0

autoCommitEnable = trueに設定すると、オフセットがコミットされます。 – GuangshengZuo

関連する問題