2016-10-20 11 views
1

HighlevelProducerとHighlevelConsumerを使用してメッセージを送受信しています。 HighlevelConsumerは、正常に生成されたときにのみメッセージをコミットしたいので、autoCommit = falseで構成されています。問題は、最初のメッセージが本当に決して転覆しないということです。highlevelConsumerでautoCommit = falseを使用するとコミット順が正しくない

例:

  • メッセージ1-10を送信します。
  • メッセージ1
  • メッセージ2
  • メッセージ2
  • ...
  • 私がした場合メッセージ1

をコミットメッセージ10

  • をコミットメッセージ10
  • を受信するコミットを受信する受信私のコンシューマーを再起動すると、1から10までのすべてのメッセージが再び処理されます。消費者に新しいメッセージを送信した場合にのみ、古いメッセージがコミットされます。これは、任意の数のメッセージに対して発生します。

    次のように私のコードの読み取り:

    var kafka = require('kafka-node'), 
        HighLevelConsumer = kafka.HighLevelConsumer, 
        client = new kafka.Client("localhost:2181/"); 
    consumer = new HighLevelConsumer(
        client, 
        [ 
         { topic: 'mytopic' } 
        ], 
        { 
         groupId: 'my-group', 
         id: "my-consumer-1", 
         autoCommit: false 
        } 
    ); 
    
    consumer.on('message', function (message) { 
        console.log("consume: " + message.offset); 
        consumer.commit(function (err, data) { 
         console.log("commited:" + message.offset); 
        }); 
        console.log("consumed:" + message.offset); 
    }); 
    
    process.on('SIGINT', function() { 
        consumer.close(true, function() { 
         process.exit(); 
        }); 
    }); 
    
    process.on('exit', function() { 
        consumer.close(true, function() { 
         process.exit(); 
        }); 
    }); 
    
    var messages = 10; 
    var kafka = require('kafka-node'), 
        HighLevelProducer = kafka.HighLevelProducer, 
        client = new kafka.Client("localhost:2181/"); 
    var producer = new HighLevelProducer(client, { partitionerType: 2, requireAcks: 1 }); 
    
    producer.on('error', function (err) { console.log(err) }); 
    producer.on('ready', function() { 
        for (i = 0; i < messages; i++) { 
         payloads = [{ topic: 'mytopic', messages: "" }]; 
         producer.send(payloads, function (err, data) { 
          err ? console.log(i + "err", err) : console.log(i + "data", data); 
         }); 
        } 
    }); 
    

    私が何か間違ったことか、これはカフカノードのバグであるだろうか? (お互い、すなわち、2コミットする前に発生した後

  • +0

    [commitAsyncが最初の2つのオフセットをコミットできない理由](http://stackoverflow.com/questions/37794718/why-commitasync-fails-to-commit-the-first-2-offsets) –

    答えて

    0

    Aが迅速に行われたメッセージ1.

    は、あなたのコミットを非同期で実行し、メッセージ1とメッセージ2のコミットされたようで、暗黙的なコミットされたメッセージ2のコミット消費者が1)のコミットを送信した場合、最初のコミットは明示的に行われず、メッセージ2の1回のコミットのみが送信されます。

    関連する問題