2017-09-24 9 views
1

生成されたレコードが失敗した場合にコールバックを起動するように設定したいと思います。最初は、失敗したレコードを記録するだけです。kafka-python kafka.KafkaProducer#send()に失敗したコールバックを追加するには?

コンフルエントカフカのPythonライブラリのコールバックを追加するためのメカニズムを提供します:

produce(topic[, value][, key][, partition][, on_delivery][, timestamp]) 
... 
    on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery 

どのように私はそれがkafka.SimpleClient#send_produce_request()

答えて

2

を使用して、非推奨SimpleClientを使用することなく、カフカ-pythonのkafka.KafkaProducer#send()と同様の動作を実現することができますこれは文書化されていませんが、これは比較的簡単です。メッセージを送信するたびにすぐにFutureが返ってきます。あなたはそのFutureへのコールバック/エラーバックのを追加することができます

F = producer.send(topic=topic, value=message, key=key) 
F.add_callback(callback, message=message, **kwargs_to_pass_to_callback_method) 
F.add_errback(erback, message=message, **kwargs_to_pass_to_errback_method) 

関連するソースコードをここに: https://github.com/dpkp/kafka-python/blob/1937ce59b4706b44091bb536a9b810ae657c3225/kafka/future.py#L48-L64

を私たちは本当に私はそれを追跡するためにhttps://github.com/dpkp/kafka-python/issues/1256を提出し、これを文書化する必要があります。

関連する問題