私のAPIは、約100回のダウンストリームコールをペアで2つの別々のサービスにします。クライアントに返答する前に、すべての回答を集約する必要があります。私はhystrix-feignを使ってHTTP呼び出しを行います。observableをblocking observableに変換してrxJavaを乱用していますか?
私は、次の
BlockingObservable演算子を遮断提供観察可能な各種のあることがわかりましたrxJava docsにまでエレガントなソリューションだと信じていたものを思い付きました。テストやデモの目的には便利ですが、一般的には実稼動アプリケーションには適していません(Blocking Observableを使用する必要があると思われる場合は、通常、デザインを再考する必要があります)。
この設定に基づいて
List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
Observable<C> zipped = Observable.zip(
feignClientA.sendRequest(request.A()),
feignClientB.sendRequest(request.B()),
(a, b) -> new C(a,b));
observables.add(zipped);
}
Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();
Observable
.merge(observables)
.toBlocking()
.forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));
return apiResponse;
いくつかの質問次のように私のコードは、おおよそになります
- はtoBlocking()されていますメインスレッドがforEach()に到達するまで、実際のHTTP呼び出しは行われません。
- forEach()ブロックは異なるスレッドによって実行されますが、forEach()ブロックに複数のスレッドが存在するかどうかを確認することはできませんでした。実行は並行していますか?