kafka-nodeを使用してkafkaでNodeを初めて使用しています。メッセージを消費するには、外部APIを呼び出す必要があります。外部APIを呼び出す必要があります。私は消費者が突然失敗するのを克服したい。消費者が失敗した場合、それを消費する消費者は仕事が完了しなかったという同じメッセージを受け取る。kafka-nodeを使用して消費されたkafkaメッセージのコミットを制御する方法
私はkafka 0.10を使用しており、ConsumerGroupを使用しようとしています。
私はオプションで設定し、その作業が完了した時点でメッセージをコミットすることを考えました(前に以前にいくつかのJavaコードで行ったように)。
しかし、いったん完了したらメッセージを正しくコミットする方法がわからないようです。どうすればいいですか?
もう一つの心配は、コールバックのために、前のメッセージが終了する前に次のメッセージが読み込まれているように見えることです。メッセージx + 2がメッセージx + 1の前に終了した場合、オフセットはx + 2に設定されるため、失敗した場合、x + 1は決して再実行されません。ここで
は、私がこれまで何をしたか、基本的である:あなたがここに行うことができます
var options = {
host: connectionString,
groupId: consumerGroupName,
id: clientId,
autoCommit: false
};
var kafka = require("kafka-node");
var ConsumerGroup = kafka.ConsumerGroup;
var consumerGroup = new ConsumerGroup(options, topic);
consumerGroup.on('connect', function() {
console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic);
});
consumerGroup.on('message', function(message) {
console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
console.log(message.value);
doSomeStuff(function() {
// HOW TO COMMIT????
consumerGroup.commit(function(err, data) {
console.log("------ Message done and committed ------");
});
});
});
consumerGroup.on('error', function(err) {
console.log("Error in consumer: " + err);
close();
});
process.once('SIGINT', function() {
close();
});
var close = function() {
// SHOULD SEND 'TRUE' TO CLOSE ???
consumerGroup.close(true, function(error) {
if (error) {
console.log("Consuming closed with error", error);
} else {
console.log("Consuming closed");
}
});
};
あなたのプロフィールへの回答ポイントへのリンク –
完了。指摘してくれてありがとう –