2017-03-26 25 views
2

TL - RxJSを使用しているときに、REST APIへの同時接続のHTTPリクエスト数を制御する方法を探しています。RxJSとの並行性を管理する方法はありますか?

私のNode.jsアプリケーションは、数千のREST APIコールをサードパーティプロバイダに送信します。しかし、すべてのリクエストを一度に行うと、DDoS攻撃のためにサービスがダウンしたりリクエストを拒否する可能性があることはわかっています。だから、私はいつでも最大同時接続数を設定したいと思う。私はThroat Packageを利用してPromisesで同時実行制御を実装していましたが、これを実装するのと同様の方法は見つかりませんでした。

この投稿How to limit the concurrency of flatMap?で提案されているように、mergeを1と一致させようとしましたが、すべての要求がすぐに送信されます。

は、ここに私のコードです:

var Rx = require('rx'), 
    rp = require('request-promise'); 

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3' 
]; 

var source = Rx.Observable.fromArray(array).map(httpGet).merge(1); 

function httpGet(url) { 
    return rp.get(url); 
} 

var results = []; 
var subscription = source.subscribe(
    function (x) { 
    console.log('=====', x, '======'); 
    }, 
    function (err) { 
    console.log('Error: ' + err); 
    }, 
    function() { 
    console.log('Completed'); 
    }); 
+2

http://stackoverflow.com/documentation/rxjs/8247/common-recipes/27973/sending-multiple-parallel-http-requests#t=201703261009146257815 – martin

答えて

1

感謝。私の問題はrxjs NPMモジュールの代わりにrxを使用することと関係していました。 rxをアンインストールしてrxjsをインストールすると、すべてのサンプルが並行処理を期待通りに使い始めました。したがって、Promises、Callbacks、およびNative ObservableとのHTTP同時呼び出しは正常に機能しました。

誰もが同様の問題に遭遇し、トラブルシューティングを行うことができるように、ここに投稿しています。

HTTPリクエストのコールバックベースのサンプル:

var Rx = require('rxjs'), 
    request = require('request'), 
    request_rx = Rx.Observable.bindCallback(request.get); 

var array = [ 
    'https://httpbin.org/ip', 
    'https://httpbin.org/user-agent', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3' 
]; 

var source = Rx.Observable.from(array).mergeMap(httpGet, 1); 

function httpGet(url) { 
    return request_rx(url); 
} 

var subscription = source.subscribe(
    function (x, body) { 
    console.log('=====', x[1].body, '======'); 
    }, 
    function (err) { 
    console.log('Error: ' + err); 
    }, 
    function() { 
    console.log('Completed'); 
    }); 
約束ベースのサンプル:

var Rx = require('rxjs'), 
    rp = require('request-promise'); 

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3', 
    'https://httpbin.org/delay/3' 
]; 

var source = Rx.Observable.from(array).mergeMap(httpGet, 1); 

function httpGet(url) { 
    return rp.get(url); 
} 

var results = []; 
var subscription = source.subscribe(
    function (x) { 
    console.log('=====', x, '======'); 
    }, 
    function (err) { 
    console.log('Error: ' + err); 
    }, 
    function() { 
    console.log('Completed'); 
    }); 

ネイティブRxJSサンプル:

var Rx = require('rxjs'), 
    superagent = require('superagent'), 
    Observable = require('rxjs').Observable; 

var array = [ 
    'https://httpbin.org/ip', 
    'https://httpbin.org/user-agent', 
    'https://httpbin.org/delay/10', 
    'https://httpbin.org/delay/2', 
    'https://httpbin.org/delay/2', 
    'https://httpbin.org/delay/1', 
]; 

let start = (new Date()).getTime(); 

var source = Rx.Observable.from(array) 
    .mergeMap(httpGet, null, 1) 
    .timestamp() 
    .map(stamp => [stamp.timestamp - start, stamp.value]); 

function httpGet(apiUrl) { 
    return Observable.create((observer) => { 
    superagent 
     .get(apiUrl) 
     .end((err, res) => { 
      if (err) { 
       return observer.onError(err); 
      } 
      let data, 
       inspiration; 
      data = JSON.parse(res.text); 
      inspiration = data; 
      observer.next(inspiration); 
      observer.complete(); 
     }); 
    }); 
} 

var subscription = source.subscribe(
    function (x) { 
    console.log('=====', x, '======'); 
    }); 
1

あなたはHTTPリクエストを実行し、構成、観察に回答を平らにするmergeMap演算子を使用することができます。 1として指定concurrentmergeMapconcatMapと同等であること

let source = Rx.Observable 
    .fromArray(array) 
    .mergeMap(httpGet, 1); 

注:mergeMapあなたが同時に加入観測(すなわち、HTTP要求)の最大数を指定可能なオプションconcurrentパラメータを受け取ります。

ご質問のコードがすべてのリクエストを一度に送信する理由は、mapオペレータのhttpGet機能の呼び出しまでです。 httpGetはPromiseを返し、約束は怠惰ではない - httpGetが呼び出されるとすぐに、要求が送信されます。

上記のコードでは、httpGetは、指定された数の同時要求より少ない数がある場合、mergeMapの実装でのみ呼び出されます。

上記のコードは、構成された観測可能性とは別に各応答を放出します。あなたはまた、マーティンは彼のコメントで参照しているレシピをチェックアウトする必要があります

let source = Rx.Observable 
    .fromArray(array) 
    .mergeMap(httpGet, 1) 
    .toArray(); 

:あなたはすべての要求が完了した際に放出される配列に結合応答をしたい場合は、toArray演算子を使用することができます。

+0

ありがとうございます。あなたの答えは、私のサンプルにいくつかの微調整が必​​要であることを理解するためのヒントをくれました。また、rx!= rxjsです。 rxは日付が付いていて、並行性は機能しませんでした。上記の例を確認してください。 – Diego

2

Rx.Observable.fromPromiseが役に立つ場合があります。 concurrent1として指定された場合cartantの答えを拡張し、これを試してみてください、:時間ベースの制御のために

Rx.Observable.from(array) 
    .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)), 1) 
    .subscribe(x => console.log(x)) 

、これは私が考えることができるものです:上記の応答のための

Rx.Observable.from(array) 
    .bufferCount(2) 
    .zip(Rx.Observable.timer(0, 1000), x => x) 
    .mergeMap(x => Rx.Observable.from(x) 
    .mergeMap(url => Rx.Observable.fromPromise(rp.get(url))) 
    .subscribe(x => console.log(x)) 
関連する問題