2017-11-28 3 views
1

私はKafkaでいくつかの管理作業を行うシナリオを持っています。Kafkaでの操作を求める

トピックAには10個のパーティションがあります。トピックAからのメッセージを消費する5つの異なるアプリケーションがあり、各アプリケーション(app1、app2、app3、app4、app5)には独自のコンシューマ "group.id"(group1、group2、group3、group4、group5)があります。独自のオフセットで個別に作業します。自動オフセットコミットはtrueに設定されます。各アプリケーションのコンシューマ・グループ内のコンシューマ数は異なります(app1にはgroup1の消費者が10人、app2にはgroup2のコンシューマが5人あります)。

今管理者が望んでいるに、

ケース1:順方向電流オフセット位置(X)から5位置によってAPP2のオフセット。このタスクでは、app2を停止し、app2と同じgroup id(group2)を持つ単一のKafkaConsumer(春ではない)を作成し、新しいオフセットX + 5をコミットしてapp2を開始する必要がありますか?

case-2:トピックAのすべてのアプリケーションのオフセットを現在のオフセット位置(X)から5桁だけ転送します。すべてのアプリケーションが同じオフセットXにあると仮定します。このタスクでは、すべてのアプリケーションを停止し、空のグループid ""を持つKafkaConsumerを作成し、新しいオフセットX + 5をコミットしてすべてのアプリケーションを開始しますか?またはケース1と似ていますが、アプリケーションごとに個別に行いますか?

また、カフカと一緒にやるのが良い方法はありますか?

答えて

1

ConsumerSeekAwareはお手伝いしますか?

、リスナーが次のメソッドを持っているConsumerSeekAwareを実装する必要があり追求するために:

void registerSeekCallback(ConsumerSeekCallback callback); 

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback); 

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback); 

任意に、実行時に求める適切なスレッドのregisterSeekCallbackからのコールバックリファレンスを使用します。

https://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html/_reference.html#seek

+0

ので、問題になっているように、+ 5の位置Xに求めることを、私はregisterSeekCallback方法でX + 5のロジックを記述する必要があります。しかし、このメソッド(registerSeekCallback)はいつ呼び出されますか? – user3366706

+0

そして、アプリケーションが停止し、最後にコミットされたオフセットがXであるときに同じX + 5を実行する必要がある場合は、右の質問のように何かする必要がありますか?または? – user3366706

+0

詳細については、前述のクラスのJavaDocsを参照してください。私は確信していません –