0

node -sを使用して、redでカフカプロデューサノードを書き直しました。 私はKafka10がプロデューサに送信されたメッセージにタイムスタンプを付ける機能を追加したと述べましたが、私はJavaの例しか見つけませんでした。 nodejsを使用してメッセージにタイムスタンプを追加するにはどうすればよいですか?以下でkafkaにタイムスタンプを追加する方法nodejsを使ったメッセージ

私はプロデューサーがnodejsに実装されているかを報告:私はちょうどこれを見つけたGoogleで

function kafkaAdvancedNode(config) { 
    RED.nodes.createNode(this,config); 
    var topic = config.topic; 
    var partition = config.partition; 
    var clusterZookeeper = config.zkquorum; 
    var debug = (config.debug == "debug"); 
    var node = this; 
    var kafka = require('kafka-node'); 
    var HighLevelProducer = kafka.HighLevelProducer; 
    var Client = kafka.Client; 
    var topics = config.topics; 
    var client = new Client(clusterZookeeper); 

    try { 
     this.on("input", function(msg) { 
      var payloads = []; 

      // check if multiple topics 
      if (topics.indexOf(",") > -1){ 
       var topicArry = topics.split(','); 

       for (i = 0; i < topicArry.length; i++) { 
        payloads.push({topic: topicArry[i], messages: msg.payload}); 
       } 
      } 
      else { 
       if(partition == "" || !partition) 
        payloads = [{topic: topics, messages: msg.payload}]; 
       else 
        payloads = [{topic: topics, messages: msg.payload, partition: partition}]; 
      } 

      producer.send(payloads, function(err, data){ 
       if (err){ 
        node.error(err); 
       } 
       node.log("Message Sent: " + data); 
      }); 
     }); 
    } 
    catch(e) { 
     node.error(e); 
    } 
    var producer = new HighLevelProducer(client); 
    this.status({fill:"green",shape:"dot",text:"connected to "+clusterZookeeper}); 
} 

link

が、実際に私が何を(私のコードに統合する方法を理解していませんAvro?)

答えて

0

kafka-nodeが0.10カフカプロトコルをサポートしているとは思わないので、メッセージのタイムスタンプをサポートしていない可能性があります。

私はnode-rdkafkaを使って同様のことを書いて、node-red-contrib-rdkafkaとして公開しました。 https://github.com/hjespers/node-red-contrib-rdkafka

UPDATE:0.10 "イベント時刻"または "処理時間"タイムスタンプと動的トピック、キー、およびいずれかをサポートするnode-red-contrib-rdkafkaの新しいバージョン(0.2.0)を公開しました。パーティション。

+0

残念ながら、ラズベリーパイのインストール時にエラーが発生します。 リンク - > http://imgur.com/a/idV4U –

+0

私は、これがラズベリーパイで動作しているとは言いませんでした。このエラーはおそらく、Cで書かれた埋め込みlibrdkafkaライブラリによるものです。私は、rPIのような組み込みデバイスで使用するための純粋なJavaScriptのノード - 赤 - 寄与 - 合流モジュールも書いています。コンフルエントなRESTプロキシを使用してカフカにパブ/サブ発行します。これにより、タイムスタンプが自動的に取り込み時間として設定されます。また、ファイアウォールやインターネット経由では、プロキシまでHTTPを使用し、Kafkaプロトコルをデータセンター/クラウド内に制限するため、より効果的です。 –

+0

JSON-LD –

関連する問題