しかし、一度消費者については何か例外を得るように見えません 一度カフカがダウンします。 システムを再起動するように消費者に「すべて」尋ねることはできません。この問題を解決する良い方法はありますか?
はい、コンシューマには例外が発生せず、動作は設計通りに動作します。しかし、すべてのコンシューマを再起動する必要はありません。コンシューマがpoll()
メソッド呼び出しを定期的に呼び出すことをあなたのロジックで確認してください。消費者は、たとえクラスタが生存していなくても、それが達成されないように設計されています。実際に何が起こるかを理解するには、次の手順を検討してください。
1:すべてのクラスタが停止しており、アクティブなクラスタはありません。
2:consumer.poll(timeout) // This will be called form you portion of code
3:インサイドKafkaConsumer.java
でpoll()
メソッド呼び出しは、呼び出しのシーケンスに従うことに行われます。
poll() --> pollOnce() --> ensureCoordinatorKnown() --> awaitMetaDataUpdate()
論理的なチェックを内部的に実行した後に呼び出されるメインメソッド呼び出しを強調表示しました。さて、この時点であなたの消費者は、クラスタが再び立ち上がるまで待つでしょう。
4:再びクラスタまたは
5を再起動:消費者が通知され、クラスタがダウンする前に、通常はそれがあったとして、それが再び作業を開始します。
注:コンシューマは最後のオフセットコミットからのメッセージの受信を開始し、正常に受信されたメッセージは複製されません。
説明した動作は(0.9.xバージョン)
で有効です。これはV8.2.1で実行できますか?もしそうなら、これを有効にする方法は? – nikel
あなたは最新の消費者を使っていた新しいシステムだったので、私は残念ですが、私は古い消費者に慣れていないと思いました。 – Nautilus