2017-12-09 11 views
0

私は、3つのパーティションからのメッセージを取得する3つのスプリングカフカコンシューマ(同じグループ)を持っています。私は、これらの消費者のうちの1人が1つのパーティションから読み込みを停止すると検出したい(他の2人の消費者は他の2つのパーティションから読み込みを続ける)。これはこれまでに2回起こっており、検出された場合、再バランスを引き起こすすべての消費者を再起動して修正するのは簡単です。この問題は、どちらの場合でも早く知っておいた方がいいでしょう。だから私はそうのようなListenerContainerIdleEventを使用してみました - ここで春のカフカ消費者が1つのパーティションからメッセージを受信しなくなったときを検出するにはどうすればよいですか?

@EventListener 
public void eventHandler(ListenerContainerIdleEvent event) { 
    LOG.info("idle event fired! listnerId=" + event.getListenerId()); 

    Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions(); 
    partitions.forEach(p -> 
      LOG.info("partition: " + p.partition() + " topic:" + p.topic())); 
} 

は私のテスト結果である -

1)1つのパーティションから1本の消費者の読みは、このイベントではうまく動作します。 3つのパーティションから

2)1消費者の読み取り、すべての3つのパーティションにはメッセージがないだけ とき、このイベントが呼び出されます。 1または2パーティションにメッセージがないが、3番目のパーティションにメッセージがある場合、この イベントは呼び出されません。

1つのパーティションから(何らかの理由で...消費者問題やパーティションから読み込み可能メッセージなし)のメッセージが読まれていないとき、私は通知を受けることができる方法消費者が複数に割り当てられてありますパーティション?

答えて

1

...同時実行が1で、3つのパーティションがある場合は、パーティション

から読み取ることが利用可能なメッセージは、3つのすべてのパーティションは、同じ消費者によって処理されません。コンシューマに複数のトピックが割り当てられていて、特定のトピックからのメッセージを一定期間受信しなかった場合、イベントを公開するフレームワークは現在のところフレームワークには存在しません。

あなたは3に、コンテナの並行性を向上させる場合は、3、消費者になります - パーティションごとに1つ。このイベントは、アイドル状態になると各消費者によって公開されます。同時に、多くの場合、それを呼び出して、3つのスレッドが存在しますので、 リスナースレッドセーフでなければなりません。あなたは、パーティションの数が多い場合

明らかに、これはうまくスケールしません。

何らかの理由

...消費者問題...

そこにメッセージがトピックであり、彼らはそのトピックが割り当てられている消費者によって受信されない場合は、かなり奇妙だろうまだそのコンシューマは他のパーティションからのメッセージを積極的に受信しています。それはカフカの人々の助けを必要とするでしょう。

+0

ゲーリー...説明をありがとう。私は1人の消費者がちょうど1つのパーティションから読んでいたときの動作を観察したので、ListenerContainerIdleEventはこの状態を検出するように動作するはずです。これに対する副問として、テストのために特定のパーティションにメッセージを投稿する方法はありませんでした。これを行う方法はありますか? kafkaコマンドラインプロデューサは--partitionオプションをサポートしていません。 – rmulay

+0

springの 'kafkaTemplate.send(topic、partition、key、value)'を使うことができます。キーを気にしない場合は、そこに 'null'を使用してください。 –

関連する問題