2017-11-27 9 views
0

IBM Bluemixメッセージハブからメッセージを消費して生成するのにnode-rdkafka(https://github.com/Blizzard/node-rdkafka)を使用しています。私は、ノードプロセスごとに最大2つのトピックだけを購読する限り、問題なくメッセージを消費して生成することができます。 3つ以上のトピックを購読すると、私の消費者は購読されたトピックのいずれにもメッセージを受け取らなくなります。私はどんな誤りも見ない。カフカコンシューマーが3つ以上のトピックにサブスクライブしたときにメッセージを受信しない

ここにはソフトリミットはありますか?あるいは、私のコードにこの問題を引き起こす何かがありますか?サーバメモリのバンプアップは何の効果もないようです。

プロデューサーコード:

events.send = events.produce = (topic, type, data) => { 
    log.info('Sending message on topic ' + topic); 

    let producer = lib.getProducer(hubConfig); 

    // Connect to the broker manually 
    producer.connect({}, (err) => { 
    if (err) { 
     log.error('Producer failed to connect'); 
     log.error(err); 
    } 
    }); 

    // Wait for the ready event before proceeding 
    producer.on('ready',() => { 
    log.info('Producer ready, sending message'); 
    try { 
     producer.produce(
     topic, 
     null, 
     new Buffer(JSON.stringify(data)), 
     type, 
     Date.now() 
    ); 
    } catch (err) { 
     log.error('A problem occurred when sending our message'); 
     log.error(err); 
    } 
    }); 

    producer.on('event.error', (err) => { 
    log.error('Error from producer'); 
    log.error(err); 
    }) 
}; 

lib.getProducer = (hubConfig) => { 
    return new Kafka.Producer({ 
     'metadata.broker.list': hubConfig.kafka_brokers_sasl.join(','), 
     'security.protocol': 'sasl_ssl', 
     'ssl.ca.location': '/etc/ssl/certs', 
     'sasl.mechanisms': 'PLAIN', 
     'sasl.username': hubConfig.user, 
     'sasl.password': hubConfig.password, 
     'api.version.request': true, 
     'dr_cb': true, 
     'event_cb': true 
    }); 
}; 

消費者:

events.listen = events.consume = (topics, callback) => { 
    if (!_.isArray(topics)) { 
     topics = [topics]; 
    } 
    log.info('Subscribing to ' + topics.join(', ') + ' on test event listener...'); 
    let consumer, 
     emitter = new evt.EventEmitter(), 

     // Each consumer has a unique group and client ID 
     groupName = 'group-' + uuidv1(), 
     clientName = 'client-' + uuidv1(); 

    consumer = lib.getConsumer(hubConfig, groupName, clientName); 

    consumer.connect({}, (err) => { 
     if (err) { 
     log.error('Consumer failed to connect'); 
     log.error(err); 
     if (callback) callback(err); 
     } 
    }); 
    consumer 
     .on('ready', function() { 
     log.info('Consumer connected, subscribed to ' + topics.join(', ')); 
     consumer.subscribe(topics); 
     consumer.consume(); 
     if (callback) callback(); 
     }) 
     .on('data', function(data) { 
     let d = data.value.toString().replace(/"/g,''), 
      dupeKey = d + '-' + data.key; 
     if (!duplicateBuffer[dupeKey]) { 
      emitter.emit('message', { 
      data: d, 
      type: data.key, 
      topic: data.topic 
      }); 

      duplicateBuffer[dupeKey] = setTimeout(() => { 
      delete duplicateBuffer[dupeKey]; 
      }, DUPE_DELAY); 
     } else { 
      log.info('Ignoring duplicate event: ' + d + ' ' + data.type); 
     } 
     }) 
     .on('error', (err) => { 
     log.error(err); 
     emitter.emit('error', err); 
     }); 

    return emitter; 
    }; 

lib.getConsumer = (hubConfig, groupName, clientName) => { 
    return new Kafka.KafkaConsumer({ 
     'group.id': groupName, 
     'client.id': clientName, 
     'metadata.broker.list': hubConfig.kafka_brokers_sasl.join(','), 
     'security.protocol': 'sasl_ssl', 
     'ssl.ca.location': '/etc/ssl/certs', 
     'sasl.mechanisms': 'PLAIN', 
     'sasl.username': hubConfig.user, 
     'sasl.password': hubConfig.password, 
     'api.version.request': true, 
     'event_cb': true 
    }, {}); 
}; 

任意の提案ですか?

答えて

2

node-rdkafka Consumersで購読可能なトピック数については、ソフトリミットはありません。

私はnode-rdkafkaサンプル(https://github.com/ibm-messaging/message-hub-samples/tree/master/kafka-nodejs-console-sample)を3つのトピックを使用するように微調整し、期待通りにうまくいきました。

'debug': 'all' 

あなたのクライアントの設定の両方に:私はあなたのプロデューサーコードで表示されていない

ことの一つは、私が設定することをお勧めしたい。また調査を支援する

producer.setPollInterval(100); 

への呼び出しであり、 。

+0

デバッグメッセージが表示されたら、消費しようとしていたトピックの1つがメッセージハブに作成されていないことがわかりました。それですべてがうまくいくように整えました! – areiter

関連する問題