ノードストリームをRxjsオブザーバブルに変換しようとしています。ノードストリームをRx.jsオブザーバブルに変換する
1つのURLを試してみると、ストリーミング自体がうまくいきます。しかし、同じ機能をURLの配列にマップしようとすると、エラーが発生します。
ストリームをObservableに変換するためにRx.Nodeを使用しています。
これは私が現在
// data_array is an array of 10 urls that I'm scraping data from.
let parentStream = Rx.Observable.from(data_array);
parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function createStream(url){
return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')))
}
をしようとしているものです。しかし、これは私が最初にflatMapは思っ出力X 10(data_array内のURL数)
RefCountObservable {
source:
ConnectableObservable {
source: AnonymousObservable { source: undefined, __subscribe: [Function] },
_connection: null,
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
_subject:
Subject {
isDisposed: false,
isStopped: false,
observers: [],
hasError: false } },
_count: 0,
_connectableSubscription: null }
です観察可能なもので観察可能なものを平らにしているからです。しかし、私がflatMapを試してみると、私はこれを得ます:
Complete
Error TypeError: unknown type returned
しかし、私がしなければ、この:
これは1つのURLのために動作しますが、私は1つのストリームにdata_array内のURLのすべてをキャプチャすることはできません。
let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')))
stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'))
私は何かを誤解してるように私は感じていないだけで、それがクリア複数のURLのために働いていませんが、それは第二の例で作業を行う場合でも....私が前にすべての最初の「コンプリート」を取得理由データが入ります。
明らかに、私は何かを誤解しています。どんな助けも素晴らしいだろう。ありがとう。
* UPDATE *
私は作品別のパスを、試してみましたが、ノードストリームを使用していません。ノードストリームは理想的なので、上記の例を有効にしたいと考えています。
私が次に使用したアプローチは、私のウェブスクレイピング機能の約束を包むことでした。これは、スクレイプです。これはうまくいきますが、結果は10個の巨大な配列で、各配列の各URLのすべてのデータが含まれています。私が本当に望むのは、データオブジェクトが通過するときに一連の変換を構成できるオブジェクトのストリームです。ここで
は異なりますが、アプローチを作業:
let parentStream = Rx.Observable.from(data_array);
parentStream.map(url => {
return Rx.Observable.defer(() => {
return scrape(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]);
})
})
.concatAll()
.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function scrape(url, selector, scope) {
return new Promise(
(resolve, reject) => x(
url,
selector,
scope
)((error, result) => error != null ? reject(error) : resolve(result))
);
}