node -sを使用して、redでカフカプロデューサノードを書き直しました。 私はKafka10がプロデューサに送信されたメッセージにタイムスタンプを付ける機能を追加したと述べましたが、私はJavaの例しか見つけませんでした。 nodejsを使用してメッセージにタイムスタンプを追加するにはどうすればよいですか?以下でkafkaにタイムスタンプを追加する方法nodejsを使ったメッセージ
私はプロデューサーがnodejsに実装されているかを報告:私はちょうどこれを見つけたGoogleで
function kafkaAdvancedNode(config) {
RED.nodes.createNode(this,config);
var topic = config.topic;
var partition = config.partition;
var clusterZookeeper = config.zkquorum;
var debug = (config.debug == "debug");
var node = this;
var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var topics = config.topics;
var client = new Client(clusterZookeeper);
try {
this.on("input", function(msg) {
var payloads = [];
// check if multiple topics
if (topics.indexOf(",") > -1){
var topicArry = topics.split(',');
for (i = 0; i < topicArry.length; i++) {
payloads.push({topic: topicArry[i], messages: msg.payload});
}
}
else {
if(partition == "" || !partition)
payloads = [{topic: topics, messages: msg.payload}];
else
payloads = [{topic: topics, messages: msg.payload, partition: partition}];
}
producer.send(payloads, function(err, data){
if (err){
node.error(err);
}
node.log("Message Sent: " + data);
});
});
}
catch(e) {
node.error(e);
}
var producer = new HighLevelProducer(client);
this.status({fill:"green",shape:"dot",text:"connected to "+clusterZookeeper});
}
:link
が、実際に私が何を(私のコードに統合する方法を理解していませんAvro?)
残念ながら、ラズベリーパイのインストール時にエラーが発生します。 リンク - > http://imgur.com/a/idV4U –
私は、これがラズベリーパイで動作しているとは言いませんでした。このエラーはおそらく、Cで書かれた埋め込みlibrdkafkaライブラリによるものです。私は、rPIのような組み込みデバイスで使用するための純粋なJavaScriptのノード - 赤 - 寄与 - 合流モジュールも書いています。コンフルエントなRESTプロキシを使用してカフカにパブ/サブ発行します。これにより、タイムスタンプが自動的に取り込み時間として設定されます。また、ファイアウォールやインターネット経由では、プロキシまでHTTPを使用し、Kafkaプロトコルをデータセンター/クラウド内に制限するため、より効果的です。 –
JSON-LD –