2017-02-14 15 views
0

メッセージキュー(ActiveMQ)をリッスンし、受信したメッセージをバッチでレディに追加するnode.jsコンポーネントを開発中です(バッチごとに20個必要) 。Node.jsアプリケーションがメッセージキューをリッスンし、非同期にredisにメッセージを追加する

ActiveMQから受信したメッセージの数が10個/秒以下の場合は問題ありません。

私の問題は、メッセージが4ミリ秒ごとにキューに追加されることです。これにより、バッチに追加されるレコードの数がバッチごとに20を超えることがあります。

const stompit = require('stompit'); 
var uuid = require('node-uuid'); 
var Redis = require('ioredis'); 

var redis = new Redis(); 
var pipeline = redis.pipeline(); 
var batchCounter = 0; 

stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) { 

client.subscribe({ destination: 'MyQueue' }, function(err2, msg) { 
    msg.readString('UTF-8', function(err3, body) { 

     if (batchCounter >= 20){ 
      pipeline.exec(function(err4, results) { 
       pipeline = redis.pipeline(); 
       batchCounter = 0; 
       console.log(results); 
      }); 
     } 

     batchCounter++; 
     pipeline.set(uuid.v1(), JSON.stringify(body)); 
     //client.disconnect(); 
     }); 
    }); 
    }); 

どのようにこの問題を解決できますか?

答えて

0

フラグを使用してバッチごとのレコード数を制御することになりました。私はActiveMQからの読み込みプロセスを制御している可能性がある、おそらくより効率的な修正があると確信しています。しかし、この状況では、フラグが私の仕事でした。フラグを使用して完全なコードは、以下のリンクにあります。

https://github.com/TamerB/ToActiveMQToRedis/blob/master/consumer/app.js

0

.execメソッドを呼び出す前にパイプラインをリセットしてみてください。これは非同期メソッドであると想定しています。 .execは将来、ある時点で実行されるため、その前にインクリメントとpipeline.setを実行できます。

次のコードは、現在のパイプラインを保存し、その後、新しいメッセージがその後、唯一の新しいパイプラインに追加する必要があります.exec

if (batchCounter >= 20){ 
    let fullpipeline = pipeline; 
    pipeline = redis.pipeline(); 
    batchCounter = 0; 
    fullpipeline.exec(function(err4, results) { 
     console.log(err4, results); 
    }); 
} 

前に同期し新しいものを作成します。

+0

それはあなたが一度にRedisのために行くの複数のパイプラインで終わる可能性がまだ可能です。それでも問題が発生する場合は、処理するフルパイプのキューが必要な場合があります。 – Matt

関連する問題