2016-04-06 8 views
1

私は4つのパーティションを持つトピックに100万文字列をプリロードするアプリケーションを持っています。なぜ私のカフカのコミットはすべて実際にコミットしないのですか? (0.9.0.1)

アプリケーションは、手動で割り当てられたパーティションで最大2人の読者を発射:

Reader 1 -> Partition 0 & 1 
Reader 2 -> Partition 2 & 3 

Iを介してバズとパーティションから読み出し、各ConsumerRecordにconsumer.commitAsyncを行う私はこの時点で(コミットのないバッチを読みません、意図的に、私が行動を理解するまで)。

私はそれが呼び出された回数を測定するためにコミット非同期コールバックの内側ごとのトピックカウンターを入れて、合計で100万になります。

アプリケーションが落ち着いて停止した後、私は私のオフセットを見てカフカCLIツールを使用して、私のようなものを得る:これらの2中1のラグが偶然であることを

Group   Topic       Pid Offset   logSize   Lag    Owner 
group1   lowercaseStrings    0 233788   250000   16212   none 
group1   lowercaseStrings    1 249999   250000   1    none 
group1   lowercaseStrings    2 249999   250000   1    none 
group1   lowercaseStrings    3 233788   250000   16212   none 

注 - I時には異なる数字を得ることがあります。

私は、渡された例外に対して非同期コミットコールバック関数を見ていますが、それはありません。私のコードによると、私はcommitAsyncを期待どおりに100万回呼び出しました。

なぜ私はまだこのような遅れを持っていますか?これは何が原因でしょうか?

答えて

0

カフカはバッチでメッセージをフェッチします。 - ローカルにデータがなく、コンシューマが内部的にpoll()を呼び出すと、オフセット状態が更新されます - どちらかがありますあなたのローカルバッチで前進してください。

あなたがcommitAsync()を呼び出すと、あなたはあなたの最後のポーリング()の呼び出しではなく、あなたが内部的に反復することにより、現在では効果的だ1で得たオフセットをコミットします。

あなたは本当に(ポーリングしたい場合は、すべてのメッセージのための)0まで、あなたは(あなたの設定でbatch.size設定)受け入れるバッチのサイズを減らすことによって、この動作を制御することができます。私は、この設定を0にするとどこでも0ラグに達すると期待しています(スケーラブルな使用法については、スループットが破壊されることに注意してください)。