私はKafkaクライアントとプロデューサ用にシングルトンクラスを作成して、1つのオブジェクトのみを作成しました。 新しいクライアントとプロデューサインスタンスを作成せずに、同じトピックを複数回公開する必要があります。 私はproducer.on( 'ready'、fn(){})が同じクライアントとプロデューサインスタンスを使用してトリガされないことがわかりました。新しいクライアントとプロデューサオブジェクトを持っているときにトリガされるのは初めてです。ここ準備完了イベントのkafka-nodeがトリガーされていません
サンプルコード:
シングルトンクラス:
const kafka = require('kafka-node');
const logger = require('./../../../../applogger');
const kafkaConfig = require('./../../../../config/config');
function ConnectionProvider() {
let kafkaConnection = undefined;
let client = undefined;
this.getConnection =() => {
if (!this.kafkaConnection) {
logger.info("Creating new kafka connection ------------------------------------- ");
this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST);
this.kafkaConnection = new kafka.Producer(this.client);
}
return this.kafkaConnection;
};
this.getClient =() => {
if (!this.client) {
logger.info("Creating new kafka Client ------------------------------------- ");
this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST);
}
return this.client;
}
process.on('SIGINT', function() {
logger.info("Going to terminate kafka connection...!");
process.exit(0);
});
}
module.exports = exports = new ConnectionProvider;
トピック公開方法:
const kafkaClient = require('./../core/kafkaConnection');
const publishToKafka = function(dataPayload, callback) {
logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload);
let producer = kafkaClient.getConnection();
producer.on('ready', function() {
let payloads = dataPayload;
producer.send(payloads, function(err, data) {
if (err) {
logger.error(
'Error in publishing message to messaging pipeline ', err
);
callback(err, null);
return;
}
logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data);
callback(null, data);
return;
});
});
producer.on('error', function(err) {
logger.error(
'Error in publishing message to messaging pipeline ', err
);
producer.close();
});
};
DataPayloadである: dataPayload = [{トピックせ:someTopicを、メッセージ:someMessage }]
私はPublishToKafkaメソッドmultを呼び出す必要がありますkafkaクライアントとプロデューサのインスタンスを1つだけ作成したいと考えています。 プロデューサは、クライアントとプロデューサの同じオブジェクトを使用しているときにproducer.on( 'ready'、function(){})がトリガされないため、トピックをパブリッシュしません。
ありがとうございます。