2017-06-15 6 views
0

いくつかのs3オブジェクトを取得してデータベースにインポートするためにこのスクリプトを書いたので、これをローカルdbに対して実行すると、そこにあるはずのすべての3265レコードがインポートされます。 AWS Postgresのインスタンスに対して実行すると、ちょっとした方法でハングアップして正常終了し、約50ほどのレコードしかインポートできません。私はそれが何らかのタイムアウトだったと思ったが、その効果を得ることができなかった。また、カスタムprocess.exitも無視します。私もPostgresのインスタンスログを調べてみましたが、そこには何も出ていませんでした。私は少し迷っています。ノードの繊細さや、このコードを書いたやり方を見逃しているかもしれません。なぜこの特定のコードは、ローカルDBではなくリモートdbに対して実行されている途中で終了するのですか?

const { NODE_ENV } = process.env 
import config from '../config' 

config() 

import AWS from 'aws-sdk'; 
import db from './db/sequelize/models/db_connection' 
process.on('uncaughtException', function (exception, p) { 
    console.log(p) 
    console.log(exception); 
}); 

class FailedImport extends Error { 
    constructor(message) { 
    Error.captureStackTrace(this, this.constructor); 
    this.name = this.constructor.name; 
    this.message = message; 
    } 
} 

var s3 = new AWS.S3() 
async function listObjects() { 
    return await s3.listObjects({Bucket: process.env.S3_BUCKET_NAME}).promise() 
} 

function importData(objectList) { 
    return objectList.Contents.map(async (obj) => { 
    try { 
     let data = await s3.getObject({ Bucket: process.env.S3_BUCKET_NAME, Key: obj.Key}).promise() 
     let body = data.Body 
     let dataLines = body.toString().split('\n') 
     return Promise.all(dataLines.map(async (line) => { 
     try { 
      let jsonifiedLine = JSON.parse(line) 
      return await db.Site.upsert({ url: jsonifiedLine['api_url'], quantcast_rank: 0}) 
     } catch(e) { 
      console.error(e) 
     } 
     })) 
    } 
    catch(err) { 
     console.log(err) 
    } 
    }) 
} 

export function runImport() { 
    listObjects().then((objects) => { 
    return Promise.all(importData(objects)) 
     .then(() => console.log('Finished import.')) 
     .catch((err) => console.log(err)) 
    }).catch((err) => { 
    console.log(err) 
    throw new FailedImport(err) 
    }) 
} 

runImport() 

答えて

0

それは私が約束アーキテクチャの2つの非常に異なるスタイルを混合して、私はマップが非同期安全ではありませんでした使用していた道をそれを先頭することが判明しました。私は私のループのために切り替えて巻き返し、非同期の待ち受けを待って、コードを修正しました。比較する新しいコード:

class FailedImport extends Error { 
    constructor(message) { 
    Error.captureStackTrace(this, this.constructor); 
    this.name = this.constructor.name; 
    this.message = message; 
    } 
} 

var s3 = new AWS.S3() 

async function listObjects() { 
    return await s3.listObjects({Bucket: process.env.S3_BUCKET_NAME, Prefix: "datasets/cleaned.data/"}).promise() 
} 

async function importData(objectList) { 
    try { 
    for (let obj of objectList.Contents) { 
     let data = await s3.getObject({ Bucket: process.env.S3_BUCKET_NAME, Key: obj.Key}).promise() 
     console.log(obj.Key) 
     let body = data.Body 
     let dataLines = body.toString().split('\n') 
     dataLines.pop() 
     let jsonLines = dataLines.map((row) => JSON.parse(row.trim())) 
     for (let line of jsonLines) { 
     try { 
      await db.Site.upsert({ url: line['api_url'], quantcast_rank: 0}) 
     } 
     catch(err) { 
      console.log(err) 
     } 
     } 
    } 
    } 
    catch(err) { 
    console.log(err) 
    } 
} 

export function runImport() { 
    listObjects().then((objects) => { 
    importData(objects) 
     .then(() => console.log('Finished.')) 
     .catch((err) => console.error(err)) 
    }).catch((err) => { 
    console.error(err) 
    }) 
} 
関連する問題