2017-03-03 6 views
1

私は投票申請を収集し、それらをCassandraに格納するNode Appを持っています。投票は、base64でコード化された暗号化された文字列として格納されます。 APIには/exportというエンドポイントがあり、すべてのこれらの投票文字列(おそらく100万以上)を取得し、それらをバイナリに変換し、後でvote.egdファイルに追加する必要があります。そのファイルは圧縮されてクライアントに送信されます。私の考えは、各投票文字列をバイナリに変換してWriteStreamに書き込んで、Cassandraから行をストリームすることです。 この機能をPromiseで包み込み、使いやすいようにします。私は以下を持っています:Cassandraからバックプレッシャーを考慮したファイルへのストリームデータ

streamVotesToFile(query, validVotesFileBasename) { 
    return new Promise((resolve, reject) => { 
    const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`); 

    writeStream.on('error', (err) => { 
     logger.error(`Writestream ${validVotesFileBasename}.egd error`); 
     reject(err); 
    }); 

    writeStream.on('drain',() => { 
     logger.info(`Writestream ${validVotesFileBasename}.egd error`); 
    }) 

    db.client.stream(query) 
    .on('readable', function() { 
     let row = this.read(); 
     while (row) { 
     const envelope = new Buffer(row.vote, 'base64'); 
     if(!writeStream.write(envelope + '\n')) { 
      logger.error(`Couldn't write vote`); 
     } 
     row = this.read() 
     } 
    }) 
    .on('end',() => { // No more rows from Cassandra 
     writeStream.end(); 
     writeStream.on('finish',() => { 
     logger.info(`Stream done writing`); 
     resolve(); 
     }); 
    }) 
    .on('error', (err) => { // err is a response error from Cassandra 
     reject(err); 
    }); 
    }); 
} 

これを実行すると、すべての投票がファイルに追加され、正常にダウンロードされます。しかし、私が持っている問題/質問の束があります

  1. それはアプリに他のすべての要求を実行している間、私は/exportエンドポイントへのreqを作成し、この関数が実行されている場合は非常に遅いですまたはちょうどありませんエクスポート要求が完了する前に終了します。私は、イベントループがカッサンドラのストリーム(毎秒千人)からのこれらの出来事のすべてによって騒がれているので、推測していますか?

  2. すべての票がファイルに書かれているようですが、ほとんどすべてwriteStream.write()コールではfalseとなり、対応するログメッセージ(コード参照)が表示されます。

  3. WritableStreamのバックプレッシャと 'drain'イベントを考慮する必要があることを理想的には私はpipe()を使用し、バックプレッシャサポート(右?)が組み込まれているため投票をパイプにパイプするのが理想です。各行を処理する(バイナリに変換し、将来他の行フィールドから他のデータを追加する可能性があります)、どのようにパイプで行うのでしょうか?

答えて

0

このTransformStreamのための完璧なユースケース:

const myTransform = new Transform({ 
    readableObjectMode: true, 
    transform(row, encoding, callback) { 
    // Transform the row into something else 
    const item = new Buffer(row['vote'], 'base64'); 
    callback(null, item); 
    } 
}); 

client.stream(query, params, { prepare: true }) 
    .pipe(myTransform) 
    .pipe(fileStream); 

Node.js API DocsTransformStreamを実装する方法の詳細を参照してください。

+0

意味があります。しかし、私があなたのコードを使用して^ ^エラーが発生しました:TypeError:ValidChunk(_stream_writable.js:216:10)のTransform.Writable.write(_stream_writable.js:245:12)のResultStreamで無効な非文字列/バッファチャンクResultStream.Readable.read(_stream_readable.js:381:10)のResultStream.emit(events.js:188:7)のemitOne(events.js:96:13)の.ondata(_stream_readable.js:555:20) – gcosta

+1

クライアントストリームはobjectModeのReadable Streams2オブジェクトを返すので、ああ、myTransformはobjectModeのTransformストリームである必要があります。 (objectModeをtrueに設定します)。ありがとう! – gcosta

関連する問題