私はKafkaでいくつかの管理作業を行うシナリオを持っています。Kafkaでの操作を求める
トピックAには10個のパーティションがあります。トピックAからのメッセージを消費する5つの異なるアプリケーションがあり、各アプリケーション(app1、app2、app3、app4、app5)には独自のコンシューマ "group.id"(group1、group2、group3、group4、group5)があります。独自のオフセットで個別に作業します。自動オフセットコミットはtrueに設定されます。各アプリケーションのコンシューマ・グループ内のコンシューマ数は異なります(app1にはgroup1の消費者が10人、app2にはgroup2のコンシューマが5人あります)。
今管理者が望んでいるに、
ケース1:順方向電流オフセット位置(X)から5位置によってAPP2のオフセット。このタスクでは、app2を停止し、app2と同じgroup id(group2)を持つ単一のKafkaConsumer(春ではない)を作成し、新しいオフセットX + 5をコミットしてapp2を開始する必要がありますか?
case-2:トピックAのすべてのアプリケーションのオフセットを現在のオフセット位置(X)から5桁だけ転送します。すべてのアプリケーションが同じオフセットXにあると仮定します。このタスクでは、すべてのアプリケーションを停止し、空のグループid ""を持つKafkaConsumerを作成し、新しいオフセットX + 5をコミットしてすべてのアプリケーションを開始しますか?またはケース1と似ていますが、アプリケーションごとに個別に行いますか?
また、カフカと一緒にやるのが良い方法はありますか?
ので、問題になっているように、+ 5の位置Xに求めることを、私はregisterSeekCallback方法でX + 5のロジックを記述する必要があります。しかし、このメソッド(registerSeekCallback)はいつ呼び出されますか? – user3366706
そして、アプリケーションが停止し、最後にコミットされたオフセットがXであるときに同じX + 5を実行する必要がある場合は、右の質問のように何かする必要がありますか?または? – user3366706
詳細については、前述のクラスのJavaDocsを参照してください。私は確信していません –