2017-07-04 11 views
0
var offset = 1; 
var limit = 500; 

var list = new Promise(function (resolve, reject) { 
    rets.getAutoLogoutClient(config.clientSettings, (client) => { 
    var results = client.search.query(SearchType, Class, Query, { 
     limit: limit, 
     offset: offset 
    }); 
    resolve(results); 
    }); 
}); 

var source = Rx.Observable.fromPromise(list); 

source.subscribe(results => console.log(results.count)); 

私はRETSを使用して不動産サイトを作っています。RXJSエキスパンダー

私のクエリを実行しようとしているのは、RETSサーバーに限られています。すべてのデータを取得するまで、オフセットを増やすループでこれを実行します。私は、クエリを実行してカウント値を見つけるまで、カウントが何であるかわかりません。

私はexpandを使用しようとしましたが、どのように正確に動作するかの手がかりはありません。これらの複数の方法をやろうとしましたが、古いファッションを使っていてもwhileループは動作しません。 RXJSは角度4で使用していますので、私はRXJSに目を向けました。

これは明示的に行われています。私は最終的に更新されたプロパティをフェッチするためにコーンジョブを実行する必要がありますが、私の問題はすべてのデータをフェッチし、カウントがオフセットよりも大きければ毎回オフセットを増やしています。私は私のデータを持っていたら

offset += limit 

、私はそれを保存する必要があります。したがって、たとえば、ここでは500の合計の上限と1のオフセットでクエリを実行する1690だから次のだろう私のオフセットを一周ですMongoDBへ。私はすでに成功しています。オフセットを手動で設定せずにすべてのデータを取得する方法を見つけるだけです。サーバーの制限は2500です

注意、はい優に超える2500

任意の提案を持っている可能性があり、そのような媒体として、私は一発でこのすべてを取得することができますが、他のデータもあるのですか?

答えて

0

これは実際には、多くのページングされたデータソース、または一度にリクエストできるものに限られているソースがあるため、RxJSではかなり一般的な使用例です。私の心に

私の2セント

expandおそらくあなたは、未知のデータソースに対してページ付けされていると、あなたが最終カウントを決定するために、少なくとも1つのクエリを必要とすることについて、このための最高の演算子です。あなたがより簡単なオプションを問い合わせることになるデータの量を知っていたら、mergeScanのようなものを使用することになりますが、私は逃げ出します。

提案された解決策

これはとても周りあなたの頭をラップするために少しの努力がかかることがあり、私は可能な限りどのように、これはすべての作品打破に注釈を追加しました。 注:私は実際にこれをテストしていないので、構文エラーは許せません。

// Your constant limit for any one query 
const limit = 500; 

// RxJS helper method that wraps the async call into an Observable 
// I am basing this on what I saw of your sample which leads me to believe 
// that this should work. 
const clientish = Rx.Observable.bindCallback(rets.getAutoLogoutClient); 

// A method wrapper around your query call that wraps the resulting promise 
// into a defer. 
const queryish = (client, params) => 
    // Note the use of defer here is deliberate, since the query returns 
    // a promise that will begin executing immediately, this prevents that behavior 
    // And forces execution on subscription. 
    Rx.Observable.defer(() => client.search.query(SearchType, Class, Query, params)); 

// This does the actual expansion function 
// Note this is a higher order function because the client and the parameters 
// are available at different times 
const expander = (client) => ({limit, count}) => 
    // Invoke the query method 
    queryish(client, {limit, count}) 
    // Remap the results, update offset and count and forward the whole 
    // package down stream 
    .map(results => ({ 
     limit, 
     count: results.count, 
     offset: offset + limit, 
     results 
    })); 


// Start the stream by constructing the client 
clientish(config.clientSettings) 
    .switchMap(client => 
    // This are the arguments for the initial call 
    Rx.Observable.of({limit, offset: 0}) 
     // Call the expander function with the client 
     // The second argument is the max concurrency, you can change that if needed 
     .expand(expander(client), 1) 

     // Expand will keep recursing unless you tell it to stop 
     // This will halt the execution once offset exceeds count, i.e. you have 
     // all the data 
     .takeWhile(({count, offset}) => offset < count) 

     // Further downstream you only care about the results 
     // So extract them from the message body and only forward them 
     .pluck('results') 
) 
    .subscribe(results => /*Do stuff with results*/); 
+0

これは機能しません。 switchMapは関数ではありません。 takeWhileには構文エラーがあり、クライアントの処理にはより良い統合が必要です。 –

+0

@ JoshuaScottどのバージョンのRxJSを使用していますか? – paulpdaniels

+0

私はRXJS 5を使用しています –

0
const retsConnect = Rx.Observable.create(function(observer) { 
    rets.getAutoLogoutClient(config.clientSettings, client => { 
    return searchQuery(client, 500, 1, observer); 
    }); 
}); 

function searchQuery(client, limit, offset, observer) { 
    let currentOffset = offset === undefined || offset === 0 ? 1 : offset; 
    return client.search.query(SearchType, Class, Query, {limit: limit, offset: currentOffset}) 
    .then(results => { 
     offset += limit; 
     observer.next(results.maxRowsExceeded); 
     if (results.maxRowsExceeded) { 
     console.log(offset); 
     return searchQuery(client, limit, offset, observer); 
     } else { 
     console.log('Completed'); 
     observer.complete(); 
     } 
    }); 
} 

retsConnect.subscribe(val => console.log(val)); 

これは、私がここにしようとしているものとどこかになってきています。私はまだこれを微調整中です。ですから、私が探しているのは、searchQueryをもっとダウンさせることです。 observer.nextを渡す必要があるかどうかわからないので、返すsearchQueryを再度インストールするには、どこにマップしてtakeUntilをインストールするかを決めます。私はtakeUntilが本当か偽かを取るかどうか分からない。このデータをmongodbに保存するだけです。だから、私はそれをこのままにして、私のセーブ方法をそこに入れてもいいと思うけど、それでもこれを理解したい。

注:さらに多くのデータがある場合、results.maxRowsExceededはtrueを返します。したがって、maxRowsがfalseを返すと、それは停止し、すべてのデータがフェッチされます。