私は、3つのパーティションからのメッセージを取得する3つのスプリングカフカコンシューマ(同じグループ)を持っています。私は、これらの消費者のうちの1人が1つのパーティションから読み込みを停止すると検出したい(他の2人の消費者は他の2つのパーティションから読み込みを続ける)。これはこれまでに2回起こっており、検出された場合、再バランスを引き起こすすべての消費者を再起動して修正するのは簡単です。この問題は、どちらの場合でも早く知っておいた方がいいでしょう。だから私はそうのようなListenerContainerIdleEventを使用してみました - ここで春のカフカ消費者が1つのパーティションからメッセージを受信しなくなったときを検出するにはどうすればよいですか?
@EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
LOG.info("idle event fired! listnerId=" + event.getListenerId());
Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
partitions.forEach(p ->
LOG.info("partition: " + p.partition() + " topic:" + p.topic()));
}
は私のテスト結果である -
1)1つのパーティションから1本の消費者の読みは、このイベントではうまく動作します。 3つのパーティションから
2)1消費者の読み取り、すべての3つのパーティションにはメッセージがないだけ とき、このイベントが呼び出されます。 1または2パーティションにメッセージがないが、3番目のパーティションにメッセージがある場合、この イベントは呼び出されません。
1つのパーティションから(何らかの理由で...消費者問題やパーティションから読み込み可能メッセージなし)のメッセージが読まれていないとき、私は通知を受けることができる方法消費者が複数に割り当てられてありますパーティション?
ゲーリー...説明をありがとう。私は1人の消費者がちょうど1つのパーティションから読んでいたときの動作を観察したので、ListenerContainerIdleEventはこの状態を検出するように動作するはずです。これに対する副問として、テストのために特定のパーティションにメッセージを投稿する方法はありませんでした。これを行う方法はありますか? kafkaコマンドラインプロデューサは--partitionオプションをサポートしていません。 – rmulay
springの 'kafkaTemplate.send(topic、partition、key、value)'を使うことができます。キーを気にしない場合は、そこに 'null'を使用してください。 –