2017-09-12 8 views
0

私はKafkaConsumerを持っており、手動でパーティションを割り当てています。パーティションを配布するには、追加のパーティションを検出するために一定の間隔でconsumer.partitionsFor(topicId)を使用します。ジョブは永遠に実行され、このケースをサポートしたいからです。ただし、コンシューマを再起動しない限り、これは常にパーティションの初期リストを返します。Apache Kafkaで追加されたパーティションを消費する

消費者から追加されたパーティションを検出する方法はありますか?何をポーリングするのか聞きますか?

+0

私は自動割り当てをテストしましたが、追加されたパーティションも検出していないようです。 – Oliv

+0

自動割り当てについてのコードを表示してください。はい、手動でこの変更を検出することはできませんが、 'subscribe'がそれを処理できると思います。 – GuangshengZuo

+0

これは 'consumer.subscribe(singletonList(" my_topic "))'です。次に、 'consumer.assignment()'の出力をチェックし、パーティションを追加しても変更されません。 – Oliv

答えて

0

KafkaConsumerは、configuration property "metadata.max.age.ms"(デフォルトは5分)です。つまり、consumer.partitionsForを呼び出すたびに、キャッシュされたコピーが返され、新しいメタデータだけが取得されます。

プロパティを0に設定するたびに、新しいメタデータがフェッチされます。

+0

kafkaの問題を参照してくださいhttps://issues.apache.org/jira/browse/KAFKA-5881 – Oliv

-1

assignment()メソッドでクエリを実行すると、これがパーティションの再割り当てのプロセスにある可能性があります。 5秒間スリープ状態にしてから、これをチェックしてテストすることができます。

現在、このコンシューマに割り当てられているパーティションのセットを取得します。 assign(Collection)を使用してパーティションを直接割り当てることによってサブスクリプションが発生した場合、割り当てられたパーティションと同じパーティションが返されます。 トピックサブスクリプションを使用した場合、現在コンシューマに割り当てられているトピックパーティションのセットが表示されます(割り当てがまだ行われていない場合、またはパーティションが再割り当て中の場合はありません)

+0

私は少なくとも30秒待った – Oliv

+0

は、このグループの唯一の消費者ですか? – GuangshengZuo

+0

2つのjvmで2つのコンシューマが出力されますが、両方に 'assignment()'の出力が表示されます – Oliv

関連する問題