2016-08-11 20 views
0

私はspring-kafkaとspring-kafka-testバージョン1.0.2.RELEASEを使用しています。初めてカフカ消費者がレコードを取得すると、多くのレコードのうちの1レコードしか取得できないのですか?

私のテストでは、私のアプリケーションは、KafkaTemplateとほとんどのデフォルトの設定を使ってEmbeddedKafkaインスタンスの1つのTopicPartionに100レコードを連続して送ります。

私はKafkaインスタンスからレコードを取得し、それらがすべて送信されたことを確認するために、KafkaTestUtils.getRecords(コンシューマ)メソッドを使用します。

初めてgetRecordsを呼び出すと、1つのレコードしか受け取りません。私は再びそれを呼び出すと、私は明示的にTopicPartitionの先頭に消費者の位置を設定し、getRecordsを呼び出す場合、私はすべて100

はなぜgetRecordsは一つだけになるだろう得る他の99

を取得します最初に録音しますか?消費者に明示的にseekToBeginningを呼び出すことによって、100を一度に取得するより良い方法はありますか?

答えて

0

競合状態の可能性が最も高い - 消費者はpoll()に座っており、ブローカーは到着するとすぐに最初のメッセージを送信します。

kafka docsのプロパティfetch.min.bytesfetch.max.wait.msを参照してください。

fetch.min.bytesは、デフォルトでは1です。あなたはまた、getRecords()を呼び出す前KafkaTemplateをINGのflush()を試みることができる

EDIT

しかし、テストでは、すべてのメッセージを1つのフェッチで取得することに本当に頼りすぎてはいけません。

+0

「KafkaTemplate」をフラッシュしてから、 'getRecords()'を呼び出す前に5秒。 'fetch.min.bytes does 'の値を増やすと' getRecords'への最初の呼び出しで返されたレコードの総数が増えます。私はすべてのメッセージがブローカーに送信されるまでに5秒かかると思います。次の80メッセージは、最初のメッセージの直後に行われる次の 'getRecords'呼び出しですべて利用可能です。消費者が読む前にすべてのメッセージを保証するため、または消費者が利用可能なすべてのメッセージを読むことを保証するために行うことができる他のものはありますか? –

+0

'flush()'が役に立たなかったのはちょっと驚きましたが、最後の編集で言ったように、タイミングだけに頼るとテストが脆弱になるので、 'fetch.min.bytes'をおそらく大規模な 'fetch.max.wait.ms'が信頼できるテストを行う唯一の方法でしょう。オーバーヘッドのあるメッセージ100バイトが正確に何バイトであるのか分からない限り、テストは長くなりますが、信頼できる(カフカがオーバーヘッドのサイズを変更するまで))。 –

+0

ええ、私はそこに待っていてはいけません。私は、メッセージがまだカフカに飛んでいるかどうかという問題かどうかをテストしようとしていました。消費者が最初に呼び出されたときのように、すべてのメッセージは既にそこにあるはずです。しかし、初めて、消費者は 'fetch.min.bytes'の値を満たすのに十分なメッセージを読んでいるように見えます.2番目の呼び出しでは、サイズに関係なく利用可能な他の多くのメッセージが読み込まれます。なぜ最初の呼び出しは 'fetch.min.bytes'の値に依存しますが、2番目の呼び出しは依存しません。 –

0

タイミングの問題のように聞こえます。最初にpoll()を呼び出したときに、1つのメッセージしか利用できなかった可能性があります。この方法では、取り出されるメッセージの数は保証されません。コードを書くときには、一度にXレコードを受け取ると仮定してはいけません。テスト目的のために、すべてを100にポーリングするまで、受信ループを実行することができる、Kafka 0.10 max.poll.recordsのコンシューマープロパティがあります。

関連する問題