kafka-python apiを使ってトピックにたくさんのメッセージを送ります。成功したトピックに送信されますが、プログラムには、次のエラーメッセージを表示して終了する前にいないそれらのすべてが送信されたメッセージの一部:KeyError:kafka.producer.record_accumulator.RecordBatch
KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290>
Batch is already closed -- ignoring batch.done()
Error processing errback
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs
f(value)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce
self._complete_batch(batch, error, -1, None)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch
self._accumulator.deallocate(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate
self._incomplete.remove(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove
return self._incomplete.remove(batch)
すべてのメッセージの異なる数を実行するには、実際に私のトピックで受信されています。この問題は、kafka producer.send呼び出しがプログラムが終了する前に送信を完了しないことが原因です。カフカドキュメンテーションのproducer.sendによる
は、おそらく根本的な原因である非同期メソッドである - すべての非同期スレッドを完了していないプロセスが殺される前に送信:
The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
このためナイーブソリューションの数があります(たとえば、batch.size
を低く設定するなど)、パフォーマンスのボトルネックが発生する可能性があります。
この問題を解決するにはどうすれば性能を損なうことなく、を解決しますか?
私が試した最初のことでした。この説明で指摘したように、プロデューサはメッセージを送信するために非同期呼び出しをディスパッチしたように見えますが、バッチサイズが十分に小さくないため、まだ完了していません。 – r2d2oid
これは質問に対する答えを提供しません。十分な[評判](https://stackoverflow.com/help/whats-reputation)があれば、[投稿にコメントする]ことができます(https://stackoverflow.com/help/privileges/comment)。代わりに、[質問者からの明確化を必要としない回答を提供する](https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-can- i-do-代わりに)。 - [レビューの投稿](レビュー/低品質の投稿/ 17956884) – demonplus
@ r2d2あなたは問題はかなり簡単です。プロデューサーがすべてのメッセージを送信する前に、プログラムの終了をあなた自身で説明しました。バッチサイズで再生することで解決しようとしているのは正しくありません。バッファに何かがある場合は、送信が完了するまで待つ必要があります。 [flush](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)は、everythinが送信されるまでプログラムをブロックします。それだけです。最後のレコードを送った後に 'producer.flush()'を呼び出した後にこのバグが発生した場合、私は間違っていて、あなたの問題(kafka-pythonのバグ)を理解できません。 – Loki