2016-06-28 6 views
0

いくつかのログタイプエントリを作成するためのバッチプロセスを作成しています。それは私のAPIとは別のアプリケーションであり、この1つのタスクだけを行います。今、私はlogMsgが配信されるエンドポイントを持つExpress APIを持っています。これらは多くのユーザーから来ており、各ユーザーは10秒ごとに1人を送信します。物事を動かすために、私はそのlogMsgの簡単な操作をいくつか行い、それを私のPostgres DBに貼り付けます。knexクエリービルダーの結果を同期処理する

今、私は私がより簡単に活動中に費やした時間を計算することができるように、いくつかのLOGENTRIESを構築するためにそれらのメッセージを反復します。それぞれのlogMsgには、アクティビティ(または1つ不足)、ユーザー、タイムスタンプがあります。 LogEntryには、user、activity、startTime、およびendTimeがあります。このようにして、APIアプリケーションは、ユーザーの活動やユーザーの活動の活動に費やす時間を簡単に計算できます。

だから私はいくつかのn個の未処理のポイントをつかむバッチジョブを設定したいのですが、その後、完全としてそれらをマークし、その後、それらを処理し、処理と同様に、それらをマークします。私が困っているところでは、物事を整えています。

let activityLogJob = new CronJob('30 * * * * *', function() { 

    return knex 
    .raw() //grab a chunk of rows and mark as In Progress 
    .then((processingRows) => { 
     _.forEach(logMsgs, (msg) => { 
      console.log('message id:', msg.id); 
      return knex.select() //latest log entry for msg.user 
       .then((logEntry) => { 
        if (logEntry) { 
         if (logEntry.activity_id !== msg.activity_id) { 
          console.log('new activity started. end old one'); 
          return knex.transaction((trx) => { 
           trx 
           .update() //logEntry.endTime = msg.startTime 
           .then((update) => { 
            trx 
            .insert(); //create new logEntry for new activity 
           }); 
          }); 
         } 
        } else { 
         console.log('first log entry for user'); 
         knex 
         .insert(); //create new logEntry since we don't have one for the user 
        } 
       }) 
       .then((result) => { 
        console.log('finished msg :', msg.id); 
        knex 
        .update(); //set msg.status = LOGGED 
       }) 
       .catch((err) => { 
        logger.error("ERROR: ", err); 
       }); 
     });  
    }); 

}, null, true); 

それでは、私は上記のコードを参照することは、細かい行のすべてをつかむが、それは実際にそれらを処理するために行くとき、それは奇妙な取得することです。ログのステートメントは次のようになります。

message 1 
first log entry for user 
message 2 
first log entry for user 
... 
message n 
first log entry for user 

finished message 1 
finished message 2 
... 
finished message n 

ここでは2つの問題が発生していると思います。
1.既存のLogEntryを検出せず、毎回新しいLogEntryを作成しません。それは、すべてのメッセージが処理されるまで挿入が起こっていないように感じます。
2.私は、私は私の問題は、私はまだ約束の周りに適切に私の頭をラップではないよということであるかなり確信してより

message 1 
first entry for user 
finished message 1 
message 2 
first entry for user 
finished message 2 
... 

ようになり、ログ・ステートメントを期待しているだろう。

答えて

0

あなたは別の約束のコールバック内にネストされていますすべての約束を返す必要があります。そうしないと、ネストされた約束の結果はチェーンを通過しません。それはあなたのメッセージが同期するのを助けるはずです。

return knex 
.raw() //grab a chunk of rows and mark as In Progress 
.then((processingRows) => { 
    _.forEach(logMsgs, (msg) => { 
     console.log('message id:', msg.id); 
     return knex.select() //latest log entry for msg.user 
      .then((logEntry) => { 
       if (logEntry) { 
        if (logEntry.activity_id !== msg.activity_id) { 
         console.log('new activity started. end old one'); 
         return knex.transaction((trx) => { 
          return trx ##### 
          .update() //logEntry.endTime = msg.startTime 
          .then((update) => { 
           return trx ##### 
           .insert(); //create new logEntry for new activity 
          }); 
         }); 
        } 
       } else { 
        console.log('first log entry for user'); 
        return knex ##### 
        .insert(); //create new logEntry since we don't have one for the user 
       } 
      }) 
      .then((result) => { 
       console.log('finished msg :', msg.id); 
       return knex ##### 
       .update(); //set msg.status = LOGGED 
      }) 
      .catch((err) => { 
       logger.error("ERROR: ", err); 
      }); 
    });  
}); 
関連する問題