2017-02-01 21 views
3

まだ作成されていない可能性のあるトピックをリッスンして、カフカコンシューマを開始します(ただしトピック自動作成は有効です)。なぜカフカの消費者は、消費に時間がかかりますか?

その後、プロデューサーがそのトピックに関するメッセージを公開しています。

ただし、消費者には通知には少し時間がかかりますこれは正確です:5分。この時点で、コンシューマはパーティションを取り消し、コンシューマ・グループに再び参加します。カフカはグループを再安定させる。消費者対カフカのログのタイムスタンプを見ると、このプロセスは消費者側でインスタンス化されます。

これは予想される動作だと思いますが、これを理解したいと思います。これは実際に(0から1のパーティションまで)再調整されていますか?トピックを先に作成すると、これは起こりませんか?

2017-02-01 08:36:45.692 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group tps-kafka-partitioning 
2017-02-01 08:36:45.692 INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2017-02-01 08:36:45.693 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group tps-kafka-partitioning 
2017-02-01 08:36:45.738 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group tps-kafka-partitioning with generation 1 
2017-02-01 08:36:45.747 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group tps-kafka-partitioning 
2017-02-01 08:36:45.749 INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[] 
2017-02-01 08:41:45.540 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group tps-kafka-partitioning 
2017-02-01 08:41:45.544 INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2017-02-01 08:41:45.544 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group tps-kafka-partitioning 

カフカは

[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-partitioning with old generation 1 (kafka.coordinator.GroupCoordinator) 
[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-partitioning generation 2 (kafka.coordinator.GroupCoordinator) 
[2017-02-01 08:41:45,551] INFO [GroupCoordinator 1001]: Assignment received from leader for group tps-kafka-partitioning for generation 2 (kafka.coordinator.GroupCoordinator) 
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-group-id with old generation 1 (kafka.coordinator.GroupCoordinator) 
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-group-id generation 2 (kafka.coordinator.GroupCoordinator) 

答えて

4

これはおそらく消費者が話題のメタデータの更新を強制する頻度を制御するパラメータmetadata.max.age.msのデフォルト値によるものですがログに記録されます。

既存のトピックがない消費者を開始すると、ブローカーがこのトピックを自動作成するということですが、これはリーダー選挙などで少し時間がかかるため、消費者がそのトピックのメタデータを要求するとLEADER_NOT_AVAILABLEという警告が表示され、メッセージを取得できません。 上記のタイムアウトに達すると、消費者はメタデータをリフレッシュし、今度は正常にメッセージを読み込み、メッセージの読み取りを開始します。これは、プロデューサーがトピックにメッセージを書き込むことに依存するものではなく、純粋に消費者向けのものです。

たとえば、1000msのタイムアウトでコンシューマを起動すると、メッセージが消費されるまで遅延が大幅に短縮されます。

また、トピックを前面に作成するか、コンシューマの前にプロデューサを開始する場合、この動作はまったく起こりません。

関連する問題