2017-09-16 3 views
0

1日に1回トリガーされるバッチジョブがあります。 カフカに関するすべてのメッセージを消費して切断します。

  1. プロセスのメッセージ処理が正常に完了した場合
  2. 、オフセットをコミット、その時点でカフカトピックで使用可能なすべてのメッセージを消費する必要があります。

現在、ConsumerRecords.isEmpty()がtrueになるまでwhileループ内のメッセージをポーリングします。 ConsumerRecords.isEmpty()がtrueの場合、その時点でTopicで使用可能なすべてのレコードが消費されたと見なします。アプリケーションはオフセットを維持し、カフカの消費者を閉じます。

メッセージの処理が完了して正常に完了すると、私は新しいKafkaConsumerを作成し、アプリケーションによって維持されるオフセットをコミットします。

注意最初にメッセージを読むために使用したKafkaConsumerを閉じ、コンシューマの再バランスの例外を避けるために別のKafkaConsumerインスタンスを使用してオフセットをコミットします。

トピックに対して最大5kのメッセージが必要です。トピックは分割され、複製されます。

トピックの特定の時点ですべてのメッセージを消費する良い方法はありますか?行方不明、または世話をする必要があるものはありますか?私はループ内のメッセージをpoll()してからポーリングが完了した後にメッセージを処理するので、消費者のリバランスを考慮する必要はないと思います。

私はjava kafka client v0.9を使用しており、上記のシナリオで役立つ場合はv0.10に変更できます。

更新

ありがとう:

AtomicBoolean flag = new AtomicBoolean(); 
flag.set(true); 

while(flag.get()) { 

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(timeout); 

if(consumerRecords.isEmpty()) { 
    flag.set(false); 
    continue; 
} 
    //if the ConsumerRecords is not empty process the messages and continue to poll() 
} 
kafkaConsumer.close(); 
+0

結果がどのようになっているかを明確にすることはできますか?具体的には、1日の結果リスト(1日のレポートのように、他の日のレポートから独立したもの)に結果が追加されるのか、それとも(現在販売されているアイテムの合計を差し引いて在庫レベルを更新するなど)または、他の何か? – Svend

+0

また、5Kの日次入力イベントの性質は何ですか?それらは毎日どのように割り当てられていますか?たとえば、何らかの問題が原因で、バッチが最後の3日間、次回の正常な実行中に正常に実行されなかった場合、15Kメッセージを分割して別々に処理するか、または計算が受け入れられず、まだ処理されているメッセージですか? – Svend

+0

@Svendアプリケーションは日報を生成します。毎日の入力イベントは、第三者のシステムから発生します。バッチが過去3日間正常に実行されなかった場合は、メッセージを分割する必要はなく、処理されていないすべてのメッセージを受信する必要があります。 –

答えて

0

あなたは)(ポーリングする呼び出しの後、あなたが原因max.pollにその瞬間のトピックで利用可能なすべてのメッセージを読んでいると仮定することはできません消費者の.records設定パラメータ。これは、単一のpoll()によって返されるレコードの最大数で、デフォルト値は500です。つまり、その瞬間にトピック内に600個のメッセージがある場合は、すべてのメッセージを読むためにpoll()を2回呼び出す必要があります(ただし、他のメッセージが届く可能性があると考えてください)。 私が理解していないもう一つの理由は、異なる消費者を使用してオフセットをコミットする理由です。あなたが話している消費者の再バランスの例外は何ですか?

+0

私は質問で言ったように、私は一度poll()をやりません。私はループでそれを行います。私はより明確になるようにコードで質問を更新しました。 –

+0

別のKafkaConsumerをコミットのオフセットに使用する理由は、メッセージを消費すると、ポーリングを停止してメッセージの処理を開始するため、メッセージを消費するために使用したKafkaConsumerがハートビートを送信せず、私がオフセットをコミットするために同じKafkaConsumerを使用しようとすると、死んだと考えられ、再調整が行われます。 メッセージが消費者になったら、私はKafkaConsumerを閉じてメッセージを処理し、メッセージ処理が成功した場合にのみオフセットをコミットするために新しいKafkaConsumerを作成します。 –

+0

Kafka 0.10.1.0から、ハートビートが変更され、バックグラウンドスレッドで実行されるようになりました。クライアントが死亡し、消費者グループから離脱したことを考慮しなくても、長時間ポーリングをやめることができます。 Btwにmax.poll.interval.msという名前の新しいタイムアウトがあります(デフォルトは5分).5分以内にポーリングを再コールする必要があります。 0.10.1.0のリリースノートの詳細:https://kafka.apache.org/documentation/#upgrade_1010_notable – ppatierno

関連する問題