2017-11-02 11 views
0

私はdocsを読んでいて、max.poll.interval.msというプロパティを見つけましたが、私が必要とする設定ではないようです。カフカの消費者が受け取ったデータを絞り込む方法はありますか?

基本的には、n秒ごとにレコードをポーリングするように消費者に指示するには、min.poll.interval.msのようなものが必要です。

max.poll.recordsと組み合わせて、私のサービスが適切な負荷量を処理していることを保証することができます。

答えて

1

このようには動作しません。

新しいレコードがある場合は、そのレコードを得るために、定期的に(ループで)Consumer.poll(...)を呼び出す必要があります。

あなたがレコード処理を行うと、同じスレッドで(投票を)recevingた場合、処理は時間がかかりすぎる場合は、その後、あなたの消費者は、消費者のグループの外にスローされますと、別のパーティションを取得します。


代わりに、kafka-streamsを使用したくない場合もあります。同じapplication idと異なるインスタンス上のストリームアプリケーションを起動すると、負荷分散のいくつかの種類を提供します。

+0

この答えは、パーティションごとに1つの消費者とのOKであることを前提としています。それはそうではないなら、あなたは、何もより多くの仕事を意味し、自分でオフセットを管理(およびそのパーティションの複数のコンシューマ間でデータを同期させるオフセット)する必要があります。 –

関連する問題