2016-09-11 13 views
1

現在、私のサービスが実行されていないときに、データ用の長期記憶域としてhadoopを使用するノードアプリケーションを構築しています。予想される転送量と、処理時間の最小化が望まれるため、データはディスクに書き込まれるのではなく、代わりに直接私が意図したものにパイプされます。NodeJSのタールパッキングからwebhdfsへのエラー

\nodejs_host\node_modules\webhdfs\lib\webhdfs.js:588 
    src.unpipe(req); 
     ^

TypeError: src.unpipe is not a function 
    at Request.onPipe (\nodejs_host\node_modules\webhdfs\lib\webhdfs.js:588:9) 
    at emitOne (events.js:101:20) 
    at Request.emit (events.js:188:7) 
    at Pack.Stream.pipe (stream.js:103:8) 
    at Object.hadoop.putServer (\nodejs_host\hadoop.js:37:29) 
    at Object.<anonymous> (\nodejs_host\hadoop.js:39:8) 
    at Module._compile (module.js:541:32) 
    at Object.Module._extensions..js (module.js:550:10) 
    at Module.load (module.js:458:32) 
    at tryModuleLoad (module.js:417:12) 

私は、次のマニュアルの私のコードをベースとしている:私は、次のエラーが発生します

https://github.com/npm/node-tar/blob/master/examples/packer.js https://github.com/harrisiirak/webhdfs/blob/master/README.md(リモートファイルへの書き込み)

これはコードIであります書面:

var webhdfs = require('webhdfs'); 
var fs = require('fs'); 
var tar = require('tar'); 
var fstream = require('fstream'); 

var hdfs = webhdfs.createClient({ 
    path: '/webhdfs/v1', 
    // private 
}); 

var hadoop = {} 

hadoop.putServer = function(userid, svcid, serverDirectory, callback){ 
    var readStream = fstream.Reader({path: serverDirectory, type: 'Directory'}) 
    var writeStream = hdfs.createWriteStream('/services/' + userid + '/' + svcid + '.tar') 
    var packer = tar.Pack({noProprietary: true}) 

    packer.on('error', function(){console.error(err), callback(err, false)}) 
    readStream.on('error', function(){console.error(err), callback(err, false)}) 
    writeStream.on('error', function(){console.error(err), callback(err, false)}) 
    writeStream.on('finish', function(){callback(null, true)}) 

    readStream.pipe(packer).pipe(writeStream); 
} 
hadoop.putServer('1', '1', 'C:/test', function(){console.log('hadoop.putServer test done')}); 

文書はこれがうまくいくはずだと示唆しています。私はこれらのモジュールのgithubのページに問題の上に周りを探して、588 here

req.on('pipe', function onPipe (src) { 
// Pause read stream 
stream = src; 
stream.pause(); 

// This is not an elegant solution but here we go 
// Basically we don't allow pipe() method to resume reading input 
// and set internal _readableState.flowing to false 
canResume = false; 
stream.on('resume', function() { 
    if (!canResume) { 
    stream._readableState.flowing = false; 
    } 
}); 

// Unpipe initial request 
src.unpipe(req); // <-- Line 588 
req.end(); 
}); 

答えて

0

よしとtar-FS用のtarパッケージを放棄言及誰かを見つけました:

はLIB \ webhdfsでチラッを持っていました。それに打撃を与え、すぐに誰かがwebhdfs &タールと関連する問題を持っている場合、タール-FS https://github.com/mafintosh/tar-fs

を見てみるので:)

作業

関連する問題