KafkaConsumer
のすべてのコンシューマーレコードがフェッチされるまで続けることが重要なユースケースがあります。このユースケースでは、パイプラインに入ることはありません。絶対的に確実にフェッチすることは何もないことを保証するための適切な方法は何ですか?Kafka - 空になるまで消費する
答えて
Kafkaは無限のデータストリームを処理するように設計されているため、「すべて消費する」とは誰も何らかの期間(1分)、1時間などの間にデータを送信しないことを意味します。
あなたは(擬似コード)のようなものを使用することができます。
int emptyCount = 0;
while (true) {
records = Consumer.poll(500);
if (records.empty()) {
emptyCount++;
if (emptyCount >= 100) {
break;
}
continue;
}
emptyCount = 0;
...process records...
}
あなたは空のサイクルの世論調査&数のチューニングタイムアウトが必要な待機時間に到達することができます。
これはうまくいくかもしれません。今私はseekToBeginning/seekToEndを見ていて、2つが同じで、ストリームが空であるとみなしているが、これは意味的に正しくないかもしれないと計算するためにpositionを使用しています。 –
この処理と長いタイムアウトでポーリングを行うことの意味の違いは何ですか?このチェックが行われている間、カフカには何も入力されていません。 –
タイムアウトがheartbeatより長い 'poll'を呼び出すと、消費者は死んだとみなされます。 –
- 1. clj-kafka - 消費者が空
- 2. CKANでKafkaを消費する方法
- 3. Kafka 10の消費者からコミットしないメッセージを消費する
- 4. Kafka消費者グループオフセット保存
- 5. mulithreaded kafka消費者、ノーシュアエント例外なし
- 6. kafkaのプロデューサーから消費者にオブジェクトを消費する方法は?
- 7. Kafka消費者が空のイテレータを返す
- 8. kafka MirrorMaker:コンシューマスレッドkafka-mirrorで消費されるブローカパーティションがありません
- 9. なぜカフカの消費者は、消費に時間がかかりますか?
- 10. Kafkaの消費者にOffsetCommitRequestを送信するには?
- 11. kafka Javaクライアントは消費しません - consumer.pollでハングアップします
- 12. 消費者コミットオフセット/ v2/kafka /(クラスタ)/コンシューマ
- 13. kafkaログ圧縮データを消費
- 14. Kafka消費者別の構成
- 15. Kafka HAの消費者設定
- 16. 春のブート - 雲母のKafka消費者
- 17. kafka消費者登録リスト(動物園)
- 18. kafkaがJavaプログラムで遅延を消費する方法
- 19. Apache Kafkaで追加されたパーティションを消費する
- 20. kafkaにはサポートできる消費者数に上限がありますか?
- 21. Kafkaは逆順でメッセージを消費します
- 22. react-kafka:なぜパブリッシャーは消費者に「行く」?
- 23. 飼い葉桶でカフカの消費者パスが空ですか?
- 24. カフカ消費者がデータを消費しない
- 25. kafka-0.9は消費者データではありません
- 26. トピックがKafkaの消費者によって消費されたかどうかを確認する方法
- 27. KAFKA REST APIを使用してJSONメッセージを消費する
- 28. Kafka KStream - 消費者の遅れを測定する
- 29. go中のKafka Avroメッセージを消費する
- 30. Kafka 0.10.2消費者数が重複しています
これは正しい方法ですか?あなたはバッチ1にストリーミングソリューションを作っているようですね? –
ゲートウェイが完了し、メッセージがまだCassandraに届いていない場合は、最後に発行されたリビジョン番号を見つけるためにキュー内のメッセージを消費します。 –
新しいデータが追加されていないことがわかっている場合、 'Consumer#endOffsets'でログの終わりを取得し、' Consumer#position'が終わりに達すると読み取りを終了できます。 –