2017-06-01 16 views
0

私はKafkaクライアントとプロデューサ用にシングルトンクラスを作成して、1つのオブジェクトのみを作成しました。 新しいクライアントとプロデューサインスタンスを作成せずに、同じトピックを複数回公開する必要があります。 私はproducer.on( 'ready'、fn(){})が同じクライアントとプロデューサインスタンスを使用してトリガされないことがわかりました。新しいクライアントとプロデューサオブジェクトを持っているときにトリガされるのは初めてです。ここ準備完了イベントのkafka-nodeがトリガーされていません

サンプルコード:

シングルトンクラス:

const kafka = require('kafka-node'); 
const logger = require('./../../../../applogger'); 
const kafkaConfig = require('./../../../../config/config'); 

function ConnectionProvider() { 
    let kafkaConnection = undefined; 
    let client = undefined; 

    this.getConnection =() => { 

     if (!this.kafkaConnection) { 
      logger.info("Creating new kafka connection ------------------------------------- "); 
      this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST); 
      this.kafkaConnection = new kafka.Producer(this.client); 
     } 
     return this.kafkaConnection; 
    }; 
    this.getClient =() => { 
     if (!this.client) { 
      logger.info("Creating new kafka Client ------------------------------------- "); 
      this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST); 
     } 
     return this.client; 

    } 
    process.on('SIGINT', function() { 
     logger.info("Going to terminate kafka connection...!"); 
     process.exit(0); 
    }); 
} 
module.exports = exports = new ConnectionProvider; 

トピック公開方法:

const kafkaClient = require('./../core/kafkaConnection'); 

    const publishToKafka = function(dataPayload, callback) { 
     logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload); 
     let producer = kafkaClient.getConnection(); 

     producer.on('ready', function() { 
      let payloads = dataPayload; 
      producer.send(payloads, function(err, data) { 
       if (err) { 
        logger.error(
         'Error in publishing message to messaging pipeline ', err 
        ); 
        callback(err, null); 
        return; 
       } 

       logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data); 

       callback(null, data); 
       return; 
      }); 
     }); 

     producer.on('error', function(err) { 
      logger.error(
       'Error in publishing message to messaging pipeline ', err 
      ); 
      producer.close(); 
     }); 

    }; 

DataPayloadである: dataPayload = [{トピックせ:someTopicを、メッセージ:someMessage }]

私はPublishToKafkaメソッドmultを呼び出す必要がありますkafkaクライアントとプロデューサのインスタンスを1つだけ作成したいと考えています。 プロデューサは、クライアントとプロデューサの同じオブジェクトを使用しているときにproducer.on( 'ready'、function(){})がトリガされないため、トピックをパブリッシュしません。

ありがとうございます。

答えて

0

私はカフカプロデューサーに複数回パブリッシュする必要があるので、カフカプロデューサーとクライアントインスタンスを終了することでこれを解決しましたが、デフォルトではkafka zookeeperは60最大接続のみを許可します)。そのため、シングルカフカインスタンスのためのシングルトンクラスを作成しました。

しかし、kafkaの単一インスタンスを作成した後は、既にready状態にあるkafkaプロデューサの同じオブジェクトを使用したため、そのproducer.on( 'ready')イベントがトリガされません。毎回パブリッシュするための新しいプロデューサインスタンスが必要です。

const publishToKafka = function(topicName, dataPayload, callback) { 
    logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload); 
    let client = new kafka.Client(kakfaConfig.ZOOKPER_HOST); 
    let producer = new kafka.Producer(client); 


    producer.on('ready', function() { 
     let payloads = dataPayload; 
     producer.send(payloads, function(err, data) { 
      if (err) { 
       logger.error(
        'Error in publishing message to messaging pipeline ', err 
       ); 
       callback(err, null); 
       return; 
      } 

      logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data); 
      producer.close(); 
      client.close(); 
      callback(null, data); 

      return; 
     }); 
    }); 

    producer.on('error', function(err) { 
     logger.error(
      'Error in publishing message to messaging pipeline ', err 
     ); 
     producer.close(); 
    }); 

}; 

単一オブジェクトのシングルトンクラスを作成する必要はありません。

関連する問題