librdkafka(基本となるCライブラリ)は、アプリケーションにメッセージを1つずつ返すだけですが、内部的にはメッセージはブローカから一括して取得されるため、パフォーマンスの低下はありません。メッセージは内部バッファに入れられ、アプリがポーリングするのを待ちます。
調整する構成挙動があります
fetch.wait.max.ms
(デフォルト100)、 fetch.message.max.bytes
を送信するためにデータを蓄積する仲介するために与えられた時間(デフォルト1048576、1ギガバイト)、バッチ queued.max.messages.kbytes
の最大サイズ(デフォルトは1000000)、内部キュー内のデータの最大サイズ。定期的にポーリングしないと、データはキューからパージされず、それ以上データをフェッチできなくなります。
そして他の多くのあなたがここで見つけることができます。https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md
あなたは本当にためにデータを処理するためにあなたの方法でデータの配列をしたい場合は、何を行うことができますすることのようなループでは低タイムアウト付きコール世論調査でxメッセージがあるか、またはyミリ秒後にループを止めてコレクションに蓄積します。生成された配列を処理し、ループを繰り返します。
データを1つずつ生成しますが、メッセージはブローカに送信される前に一括処理されます。
メッセージのバッチを返すように下位のCコードを変更できますか? Pythonで反復処理を行い、メッセージを取得するだけでは、C自体からメッセージを返す場合にプロセス全体が遅くなる可能性があるため、より高速に処理できます。 –
これは以前のケースでしたが、ベンチマークされていましたが、割り振りが行われているため、一度にメッセージを返すことと、バッチを返すことのどちらも欠点がありませんでした。あなたはループでpoll(0)を使ってバッチを作成することができます - 私はPythonをよく知っていませんが、GitHubに問題があるかもしれません(またはこれについて議論することができます)。 – Treziac
参考(私はそれをあなたのものと仮定します):https://github.com/confluentinc/confluent-kafka-python/issues/252 – Treziac