2016-03-03 11 views
5

カフカパブリッシュサブスクライブシステムを実装します。カフカクラスタの障害を処理する方法

最悪の場合、最悪の場合 - 特定のトピックのすべてのカフカブローカーがダウンした場合、どうなりますか?

私はこれを試しました...メタデータフェッチのデフォルトタイムアウト後にパブリッシャーがそれを検出しました&は成功しなかった場合例外をスローします。

この場合、例外を監視し、Kafkaを修正した後でPublisherを再起動することができます。

しかし、消費者についてはどうでしょうか。カフカが倒れても、例外はありません。私たちは消費者にシステムを再起動するように「すべて」頼むことはできません。この問題を解決する良い方法はありますか?

答えて

2

消費者(0.9.xバージョン)がポーリングされ、クラスタがダウンし、それはあなたがクラスタまでポーリングを保つことができ、次の例外

java.net.ConnectException: Connection refused 

を取得する必要がある場合は、再起動する必要はありません再び帰ってきました消費者は、接続を再確立します。

+0

で有効です。これはV8.2.1で実行できますか?もしそうなら、これを有効にする方法は? – nikel

+0

あなたは最新の消費者を使っていた新しいシステムだったので、私は残念ですが、私は古い消費者に慣れていないと思いました。 – Nautilus

4

しかし、一度消費者については何か例外を得るように見えません 一度カフカがダウンします。 システムを再起動するように消費者に「すべて」尋ねることはできません。この問題を解決する良い方法はありますか?

はい、コンシューマには例外が発生せず、動作は設計通りに動作します。しかし、すべてのコンシューマを再起動する必要はありません。コンシューマがpoll()メソッド呼び出しを定期的に呼び出すことをあなたのロジックで確認してください。消費者は、たとえクラスタが生存していなくても、それが達成されないように設計されています。実際に何が起こるかを理解するには、次の手順を検討してください。

1:すべてのクラスタが停止しており、アクティブなクラスタはありません。

2:consumer.poll(timeout) // This will be called form you portion of code

3:インサイドKafkaConsumer.javapoll()メソッド呼び出しは、呼び出しのシーケンスに従うことに行われます。

poll() --> pollOnce() --> ensureCoordinatorKnown() --> awaitMetaDataUpdate() 

論理的なチェックを内部的に実行した後に呼び出されるメインメソッド呼び出しを強調表示しました。さて、この時点であなたの消費者は、クラスタが再び立ち上がるまで待つでしょう。

4:再びクラスタまたは

5を再起動:消費者が通知され、クラスタがダウンする前に、通常はそれがあったとして、それが再び作業を開始します。

注:コンシューマは最後のオフセットコミットからのメッセージの受信を開始し、正常に受信されたメッセージは複製されません。

説明した動作は(0.9.xバージョン)

関連する問題