2016-11-23 8 views
0

私のAPIは、約100回のダウンストリームコールをペアで2つの別々のサービスにします。クライアントに返答する前に、すべての回答を集約する必要があります。私はhystrix-feignを使ってHTTP呼び出しを行います。observableをblocking observableに変換してrxJavaを乱用していますか?

私は、次の

BlockingObservable演算子を遮断提供観察可能な各種のあることがわかりましたrxJava docsにまでエレガントなソリューションだと信じていたものを思い付きました。テストやデモの目的には便利ですが、一般的には実稼動アプリケーションには適していません(Blocking Observableを使用する必要があると思われる場合は、通常、デザインを再考する必要があります)。

この設定に基づいて

List<Observable<C>> observables = new ArrayList<>(); 
for (RequestPair request : requests) { 
    Observable<C> zipped = Observable.zip(
     feignClientA.sendRequest(request.A()), 
     feignClientB.sendRequest(request.B()), 
     (a, b) -> new C(a,b)); 
    observables.add(zipped); 
} 

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>(); 

Observable 
    .merge(observables) 
    .toBlocking() 
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse))); 

return apiResponse; 

いくつかの質問次のように私のコードは、おおよそになります

  • が、私は理解して修正アム私のユースケース与えられた正当化

    1. はtoBlocking()されていますメインスレッドがforEach()に到達するまで、実際のHTTP呼び出しは行われません。
    2. forEach()ブロックは異なるスレッドによって実行されますが、forEach()ブロックに複数のスレッドが存在するかどうかを確認することはできませんでした。実行は並行していますか?
  • 答えて

    1

    より良いオプションがObservableは、他の演算子によって消費されるが、あなたは、コードをブロックして逃げることが返すことです(ただし、バックグラウンドスレッドで実行する必要があります。)

    public Observable<D> getAll(Iterable<RequestPair> requests) { 
        return Observable.from(requests) 
        .flatMap(request -> 
         Observable.zip(
          feignClientA.sendRequest(request.A()), 
          feignClientB.sendRequest(request.B()), 
          (a, b) -> new C(a,b) 
         ) 
        , 8) // maximum concurrent HTTP requests 
        .map(both -> doSomeWork(both)); 
    } 
    
    // for legacy users of the API 
    public Collection<D> getAllBlocking(Iterable<RequestPair> requests) { 
        return getAll(requests) 
         .toList() 
         .toBlocking() 
         .first(); 
    } 
    

    forEachがoperatiの全配列をトリガし、私はメインスレッドはforEachの()

    Yesになるまで、実際のHTTP呼び出しが行われませんという理解で正しいアムons。

    forEach()ブロックのコードが別のスレッドで実行されているのを見ましたが、forEach()ブロックに複数のスレッドが存在するかどうかを確認できませんでした。実行は並行していますか?

    一度に1つのだけのスレッドがforEachでラムダを実行することが許可されていますが、確かに別のスレッドが入ることがあります。

    関連する問題