2016-09-29 7 views
1

ノードストリームをRxjsオブザーバブルに変換しようとしています。ノードストリームをRx.jsオブザーバブルに変換する

1つのURLを試してみると、ストリーミング自体がうまくいきます。しかし、同じ機能をURLの配列にマップしようとすると、エラーが発生します。

ストリームをObservableに変換するためにRx.Nodeを使用しています。

これは私が現在

// data_array is an array of 10 urls that I'm scraping data from. 
let parentStream = Rx.Observable.from(data_array); 

parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function createStream(url){ 
    return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*'))) 
} 

をしようとしているものです。しかし、これは私が最初にflatMapは思っ出力X 10(data_array内のURL数)

RefCountObservable { 
source: 
    ConnectableObservable { 
    source: AnonymousObservable { source: undefined, __subscribe: [Function] }, 
_connection: null, 
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] }, 
_subject: 
    Subject { 
    isDisposed: false, 
    isStopped: false, 
    observers: [], 
    hasError: false } }, 
_count: 0, 
_connectableSubscription: null } 

です観察可能なもので観察可能なものを平らにしているからです。しかし、私がflatMapを試してみると、私はこれを得ます:

Complete 
Error TypeError: unknown type returned 

しかし、私がしなければ、この:

これは1つのURLのために動作しますが、私は1つのストリームにdata_array内のURLのすべてをキャプチャすることはできません。

let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*'))) 

stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')) 

私は何かを誤解してるように私は感じていないだけで、それがクリア複数のURLのために働いていませんが、それは第二の例で作業を行う場合でも....私が前にすべての最初の「コンプリート」を取得理由データが入ります。

明らかに、私は何かを誤解しています。どんな助けも素晴らしいだろう。ありがとう。

* UPDATE *

私は作品別のパスを、試してみましたが、ノードストリームを使用していません。ノードストリームは理想的なので、上記の例を有効にしたいと考えています。

私が次に使用したアプローチは、私のウェブスクレイピング機能の約束を包むことでした。これは、スクレイプです。これはうまくいきますが、結果は10個の巨大な配列で、各配列の各URLのすべてのデータが含まれています。私が本当に望むのは、データオブジェクトが通過するときに一連の変換を構成できるオブジェクトのストリームです。ここで

は異なりますが、アプローチを作業:

let parentStream = Rx.Observable.from(data_array); 

parentStream.map(url => { 
    return Rx.Observable.defer(() => { 
     return scrape(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]); 
    }) 
}) 
    .concatAll() 
    .subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function scrape(url, selector, scope) { 
    return new Promise(
     (resolve, reject) => x(
      url, 
      selector, 
      scope 
     )((error, result) => error != null ? reject(error) : resolve(result)) 
    ); 
} 

答えて

1

*ソリューション* 私はそれを考え出しました。私は以下のソリューションを添付しました:

RxNodeを使用する代わりに、私はRx.Observable.fromEvent()を使用することを選択しました。

ノードストリームは、新しいデータでも、エラーでも、完了しても、イベントを放出します。

fromEventスタティックオペレータは、 'data'イベントをリスンし、イベントごとに新しいObservableを作成します。

私はそれらをマージし、購読します。コードは次のとおりです。

let parentStream = Rx.Observable.from(data_array); 
parentStream.map((url)=> { return createEventStream(url); }).mergeAll().subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function createEventStream(url){ 
    return Rx.Observable.fromEvent(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')), 'data'); 
} 
関連する問題