1日に1回トリガーされるバッチジョブがあります。 カフカに関するすべてのメッセージを消費して切断します。
- が
- プロセスのメッセージ処理が正常に完了した場合
- 、オフセットをコミット、その時点でカフカトピックで使用可能なすべてのメッセージを消費する必要があります。
現在、ConsumerRecords.isEmpty()がtrueになるまでwhileループ内のメッセージをポーリングします。 ConsumerRecords.isEmpty()がtrueの場合、その時点でTopicで使用可能なすべてのレコードが消費されたと見なします。アプリケーションはオフセットを維持し、カフカの消費者を閉じます。
メッセージの処理が完了して正常に完了すると、私は新しいKafkaConsumerを作成し、アプリケーションによって維持されるオフセットをコミットします。
注意最初にメッセージを読むために使用したKafkaConsumerを閉じ、コンシューマの再バランスの例外を避けるために別のKafkaConsumerインスタンスを使用してオフセットをコミットします。
トピックに対して最大5kのメッセージが必要です。トピックは分割され、複製されます。
トピックの特定の時点ですべてのメッセージを消費する良い方法はありますか?行方不明、または世話をする必要があるものはありますか?私はループ内のメッセージをpoll()してからポーリングが完了した後にメッセージを処理するので、消費者のリバランスを考慮する必要はないと思います。
私はjava kafka client v0.9を使用しており、上記のシナリオで役立つ場合はv0.10に変更できます。
更新
ありがとう:
AtomicBoolean flag = new AtomicBoolean();
flag.set(true);
while(flag.get()) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(timeout);
if(consumerRecords.isEmpty()) {
flag.set(false);
continue;
}
//if the ConsumerRecords is not empty process the messages and continue to poll()
}
kafkaConsumer.close();
結果がどのようになっているかを明確にすることはできますか?具体的には、1日の結果リスト(1日のレポートのように、他の日のレポートから独立したもの)に結果が追加されるのか、それとも(現在販売されているアイテムの合計を差し引いて在庫レベルを更新するなど)または、他の何か? – Svend
また、5Kの日次入力イベントの性質は何ですか?それらは毎日どのように割り当てられていますか?たとえば、何らかの問題が原因で、バッチが最後の3日間、次回の正常な実行中に正常に実行されなかった場合、15Kメッセージを分割して別々に処理するか、または計算が受け入れられず、まだ処理されているメッセージですか? – Svend
@Svendアプリケーションは日報を生成します。毎日の入力イベントは、第三者のシステムから発生します。バッチが過去3日間正常に実行されなかった場合は、メッセージを分割する必要はなく、処理されていないすべてのメッセージを受信する必要があります。 –