2017-12-22 12 views
6

カフカ・コンシューマ・グループ(javaで実装されている)がブローカからのメッセージを一貫して逃していることに注目されています。 kafkaコンソールのコンシューマーを介してデバッグの最初の行として、私はブローカーで利用可能なメッセージを見ることができます。Java Kafkaコンシューマ・グループが少数のメッセージを消費していない

カフカブローカーのバージョン:0.10.1.0

カフカのクライアントバージョン:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.9.0.1</version> 
</dependency> 

カフカの消費者の設定:

Properties props = new Properties(); 
props.put("bootstrap.servers","broker1,broker2,broker3"); 
props.put("group.id", "myGroupIdForDemo"); 
props.put("key.deserializer", StringDeserializer.class.getName()); 
props.put("value.deserializer", StringDeserializer.class.getName()); 
props.put("heartbeat.interval.ms", "25000"); 
props.put("session.timeout.ms", "30000"); 
props.put("max.poll.interval.ms", "300000"); 
props.put("max.poll.records", "1"); 
props.put("zookeeper.session.timeout.ms", "120000"); 
props.put("zookeeper.sync.time.ms", "10000"); 
props.put("auto.commit.enable", "false"); 
props.put("auto.commit.interval.ms", "60000"); 
props.put("auto.offset.reset", "earliest"); 
props.put("consumer.timeout.ms", "-1"); 
props.put("rebalance.max.retries", "20"); 
props.put("rebalance.backoff.ms", "6000"); 

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 

EDIT - いくつかのより多くの情報の追加

私はいくつかのより多くの情報次のように追加します。 合計6つのパーティションがあります。ただし、同じコンシューマ・グループIDを持つトピックの合計消費者数は40です。私は、34人の消費者が遊んでいて何もしないことを理解しています。

しかし、消費者がブローカーが死んだと見なしてパーティションを再割り当てする程度にハートビートを送信できない場合、アイドル状態の消費者のいずれかがメッセージを消費する機会を得るのでしょうか?このメッセージが消費されないという問題は、特定のパーティションでのみ認識されます。私は、メッセージが同じパーティションから配信/消費されないことを意味します。

何か助けていただければ幸いです。ありがとう。

+0

最初/最後/ランダムメッセージがありませんか? – Natalia

+0

'myGroupIdForDemo'で実行中の他のプロセス/スレッドはありますか?ランダムな値を割り当てると、その動作は持続しますか? –

+0

@Natalia、それはランダムなメッセージです。 – thomas

答えて

2

a)Kafkaでもメッセージが存在しない可能性があります。その場合、メッセージサイズがkafkaブローカーの設定で許可されている最大メッセージサイズを超えていないかどうかを確認します。

b)消費者がKafkaインスタンス1と2-dインスタンスに接続していない場合、2-d kafkaからのメッセージを見逃す可能性があります。したがって、コンシューマ接続文字列のすべてのブローカを指定します。

3)メッセージがカフカに存在し、接続している場合は、メッセージを逆シリアル化することができないため、別のデシリアライザを試してみるか、文字列ではなく、何が起こるかを見てメッセージが消費されますか?はいの場合、文字列への変換に問題があります。

4)メッセージは、同じグループIDで作業している別の作業コンシューマが「盗まれた」可能性があり、一意のグループIDを選択する可能性があります。

5) 消費メッセージの表示に使用するロガーは何ですか?あなたがロガーの問題だと思わない?

6) すべてのメッセージを消費する前に、消費者を殺したり停止したりすることはありますか?

7) コンシューマのメモリ制限のために消費しても問題はありませんか? Meyは-Xmxを増やしてください。 (ヒープサイズ)

+0

こんにちは@Vladimir、7つのポイントのどれも真実を保持します。しかし、私が疑うところは、パーティションの再割り当てが発生し(ブローカーにハートビートを送信する消費者の遅れのために)、何かが起こって理解する必要があるということです。 – thomas

+0

上記のケースでは、コンシューマが切断される可能性があるため、再調整が行われます。 コンシューマの切断を防ぐ必要があります。私はあなたの消費者heratbeatが理由で失敗すると思う:いくつかのメッセージは単にあなたの消費者を殺す。メッセージサイズになるかもしれませんが、デシリアライザのエラーなどがあります。 –

+0

応答に感謝します。エラーはありませんが、一部のメッセージの処理に時間がかかります。処理はI/Oバウンド処理、すなわち、 DBでルックアップしたり、他のサービスとやりとりしたりして、再バランスを引き起こしています。再バランスを避けるためにハートビートを送ることを管理できれば、問題は解決するだろうと思いますか? – thomas

関連する問題