I以前私は、それぞれ以下のようにKafkaProducerが新た各要求で構築されたrequest.whereためプロデューサを作成するコードの下使用し、メッセージを生成するプロデューサのクラスを使用するマルチスレッドアプリケーションを持っている:私はカフカを読ん一度閉じたカフカプロデューサーを再接続するには?
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
私たちが良いパフォーマンスを得るためには、単一のプロデューサインスタンスを使用すべきであることを知りました。
次に、シングルトンクラス内にKafkaProducerの単一インスタンスを作成しました。
今度は&プロデューサーを閉じる必要があります。
java.lang.IllegalStateException: Cannot send after the producer is closed.
OR我々は一度閉じプロデューサーに再接続することができる方法を:明らかに最初の要求を送信した後、我々は生産を終了した場合、それは文句を言わないので、投げたメッセージを再送信するプロデューサーを見つけます。 プログラムがクラッシュしたり、例外が発生したりするのは問題ですか?
これはsync/asyncの両方です。さらに、例外が発生した場合:アプリケーションがクラッシュし、再接続する方法がいくつか考えられます。注:私はnullではないKafkaProducerのisntanceを再初期化せず、close()メソッドが呼び出された後でもすべてのプロパティを保持します。また、私は複数のアプリすなわち4つの消費者がこの共有のプロデューサーを使用して複数のトピックにmsgを送信します。 – usman
@usmanなぜあなたはそれが同期と非同期の両方であると言いますか? [同期版はどこにありますか](https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)? –
https://kafka.apache.org/08/documentation#implementation "2つの低レベルのプロデューサをラップするプロデューサAPI - "、。まあ、kafkaが提供する方法はありますか?あなたのコードは、オブジェクトをインスタンス化し直さなければならないことを示しています。 – usman