2017-08-28 10 views
2

私はカフカからのメッセージを読もうとしているので、私はカフカからのメッセージを読むための簡単な消費者を書いています。コンフルエントなkafka pythonのバッチメッセージを読むには?

While True: 
     message = consumer.poll(timeout=1.0) 
     # i am doing something with messages 

上記メッセージタイプのコード出力はメッセージオブジェクトです。私はどのようにメッセージの配列として得ることができますか?

ありますか?

注:消費者向けの基本設定は基本的ではありません。

答えて

5

librdkafka(基本となるCライブラリ)は、アプリケーションにメッセージを1つずつ返すだけですが、内部的にはメッセージはブローカから一括して取得されるため、パフォーマンスの低下はありません。メッセージは内部バッファに入れられ、アプリがポーリングするのを待ちます。

調整する構成挙動があります

fetch.wait.max.ms(デフォルト100)、 fetch.message.max.bytesを送信するためにデータを蓄積する仲介するために与えられた時間(デフォルト1048576、1ギガバイト)、バッチ queued.max.messages.kbytesの最大サイズ(デフォルトは1000000)、内部キュー内のデータの最大サイズ。定期的にポーリングしないと、データはキューからパージされず、それ以上データをフェッチできなくなります。

そして他の多くのあなたがここで見つけることができます。https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


あなたは本当にためにデータを処理するためにあなたの方法でデータの配列をしたい場合は、何を行うことができますすることのようなループでは低タイムアウト付きコール世論調査でxメッセージがあるか、またはyミリ秒後にループを止めてコレクションに蓄積します。生成された配列を処理し、ループを繰り返します。

データを1つずつ生成しますが、メッセージはブローカに送信される前に一括処理されます。

+0

メッセージのバッチを返すように下位のCコードを変更できますか? Pythonで反復処理を行い、メッセージを取得するだけでは、C自体からメッセージを返す場合にプロセス全体が遅くなる可能性があるため、より高速に処理できます。 –

+1

これは以前のケースでしたが、ベンチマークされていましたが、割り振りが行われているため、一度にメッセージを返すことと、バッチを返すことのどちらも欠点がありませんでした。あなたはループでpoll(0)を使ってバッチを作成することができます - 私はPythonをよく知っていませんが、GitHubに問題があるかもしれません(またはこれについて議論することができます)。 – Treziac

+0

参考(私はそれをあなたのものと仮定します):https://github.com/confluentinc/confluent-kafka-python/issues/252 – Treziac

関連する問題