2017-09-18 12 views
0

私はhighland.jsのプログラムを書いて、いくつかのファイルをダウンロードし、解凍してオブジェクトに分解し、オブジェクトストリームをflatMapで1つのストリームにマージして出力しました。1つのストリームにエラーが発生した場合、flatMapに出力がないのはなぜですか?

function download(url) { 
    return _(request(url)) 
     .through(zlib.createGunzip()) 
     .errors((err) => console.log('Error in gunzip', err)) 
     .through(toObjParser) 
     .errors((err) => console.log('Error in OsmToObj', err)); 
} 

const urlList = ['url_1', 'url_2', 'url_3']; 

_(urlList) 
    .flatMap(download) 
    .each(console.log); 

すべてのURLが有効な場合、正常に動作します。 URLが無効な場合、ダウンロードされたファイルがない場合、gunzipはエラーを報告します。私は、エラーが発生したときにストリームが閉じられると思う。私はflatMapが他のストリームを続けることを期待しますが、プログラムは他のファイルをダウンロードせず、何も印刷されません。

ストリームでエラーを処理する正しい方法と、1つのストリームにエラーが発生した後にflatMapを停止させない方法はありますか?

命令型プログラミングでは、エラーが発生した場所をトレースするデバッグログを追加できます。ストリーミングコードをデバッグするには?

PS。 toObjParserはノード変換ストリームです。これは、OSM XMLの読み取り可能なストリームを取得し、オーバーパスOSM JSONと互換性のあるオブジェクトのストリームを出力します。 https://www.npmjs.com/package/osm2obj

2017年12月19日更新を参照してください:

私は@amsrossが提案されているようerrorspushを呼び出そうとしました。 pushが実際に動作するかどうかを検証するために、私はXML文書をプッシュし、それは次のパーサによって解析され、出力から見ました。ただし、ストリームはまだ停止しており、url_3はダウンロードされませんでした。

function download(url) { 
    console.log('download', url); 
    return _(request(url)) 
     .through(zlib.createGunzip()) 
     .errors((err, push) => { 
      console.log('Error in gunzip', err); 
      push(null, Buffer.from(`<?xml version='1.0' encoding='UTF-8'?> 
<osmChange version="0.6"> 
<delete> 
<node id="1" version="2" timestamp="2008-10-15T10:06:55Z" uid="5553" user="foo" changeset="1" lat="30.2719406" lon="120.1663723"/> 
</delete> 
</osmChange>`)); 
     }) 
     .through(new OsmToObj()) 
     .errors((err) => console.log('Error in OsmToObj', err)); 
} 

const urlList = ['url_1_correct', 'url_2_wrong', 'url_3_correct']; 

_(urlList) 
    .flatMap(download) 
    .each(console.log); 

答えて

0

アップデート2017年12月19日: オクラホマので、私はあなたにこの上に良いなぜを与えることはできませんが、私はmergesequencedownloadから生じたストリームを消費するから切り替えることを伝えることができそれらをまとめておけば、おそらくあなたの後の結果が得られます。残念なことに(またはそうではありませんか?)、指定された順序で結果を戻すことはなくなります。

const request = require('request') 
const zlib = require('zlib') 
const h = require('highland') 

// just so you can see there isn't some sort of race 
const rnd = (min, max) => Math.floor((Math.random() * (max - min))) + min 
const delay = ms => x => h(push => setTimeout(() => { 
    push(null, x) 
    push(null, h.nil) 
}, ms)) 

const download = url => h(request(url)) 
    .flatMap(delay(rnd(0, 2000))) 
    .through(zlib.createGunzip()) 

h(['urlh1hcorrect', 'urlh2hwrong', 'urlh3hcorrect']) 
    .map(download).merge() 
    // vs .flatMap(download) or .map(download).sequence() 
    .errors(err => h.log(err)) 
    .each(h.log) 

アップデート2017年12月3日: エラーがストリームに遭遇したとき、それはそのストリームを終了します。これを避けるには、エラーを処理する必要があります。現在、errorsを使用してエラーを報告していますが、それを処理していません。あなたは、ストリーム内の次の値に移動するために、このような何かを行うことができます。

.errors((err, push) => { 
    console.log(err) 
    push(null) // push no error forward 
}) 

オリジナル: それはtoObjParserの入力と出力の種類がありますを知らなくてもお答えすることは困難です。

throughが提供される関数に値のストリームを通過して戻りの値のストリームを想定しているので、あなたの問題は、エラーは、内側ストリーム上で発生しているStream -> Object、又はStream -> Stream Objectような署名を有するtoObjParserに存在することができる、そのそれが消費されるまでエラーを出さない。

.each(console.log)の出力はどのようなものですか?ストリームを記録している場合、それはおそらくあなたの問題です。

+0

'toObjParser'はノード変換ストリームです。これは、OSM XMLの読み取り可能なストリームを取得し、オブジェクトのストリームを出力します。 https://www.npmjs.com/package/osm2obj – aleung

+0

Oooooooooh!私はその質問を誤解した。ストリームで前進するためにできることを反映するために私の答えを更新します。 – amsross

+0

私はプッシュしようとしましたが、それでも動作しませんでした。問題の私の更新を見てください。 – aleung

関連する問題