2016-07-27 11 views
0

私はRxJavaを使い慣れました。 Observablesを返すJersey RxJavaクライアントがいくつかあります。いくつかのデータを取得するために1回の呼び出しを行う必要があり、そのデータは次の3回の呼び出しの入力になります。これらの呼び出しを並行して実行したい。最後に、すべてのデータを必要とするすべての呼び出しが完了したら、計算を行いたいと思います。ここでは、それがどのように見えるかです:複数の観測可能な呼び出しを非同期的に呼び出すにはどうすればよいですか?

interface Service { 
    Observable<ResultA> callServiceA(InitialData input); 
    Observable<ResultB> callServiceB(ResultA resultA); 
    Observable<ResultC> callServiceC(ResultA resultA); 
    Observable<ResultD> callServiceD(ResultA resultA); 
    FinalResult simpleCalculation(ResultA a, ResultB b, ResultC c, ResultD d); 
} 

class MyClass{ 

    @Autowired 
    ExecutorService myExecutorService; 

    Observable<FinalResult> myMethod(InitialData initialData){ 
    /* Make call to ServiceA, get the results, then make calls to services B, C, and D in parallel (on different threads), finally perform simpleCalculation, and emit the result */ 
    } 
} 

答えて

0

あなたはその後、flatMapそれが再び完了時に、その後、複数の呼び出しを送信するために郵便番号を使用するか、マージ、同期呼び出しを行うためにflatMapを使用することができます。

+1

をあなたは私のサンプルコードを与えることはできますか? – Adam

4

flatMap()およびzip()は、この状況のあなたの友人です。観察可能なリターンを使用して

Observable<FinalResult> myMethod(InitialData initialData) { 
    return service 
      .callServiceA(initialData) 
      .flatMap(resultA -> Observable.zip(
        service.callServiceB(resultA), 
        service.callServiceC(resultA), 
        service.callServiceD(resultA), 
        (resultB, resultC, resultD) -> 
         service.simpleCalculation(resultA, resultB, resultC, resultD)) 
      ); 
} 

は、次のようになります。

Subscription subscription = 
     myMethod(new InitialData()) 
       .subscribe(finalResult -> { 
          // FinalResult will end up here. 
         }, 
         throwable -> { 
          // Handle all errors here. 
         }); 
+0

ありがとう!どのようにして、サービスB、C、Dを別のスレッドで並列に呼び出すことができますか?スケジューラを追加する必要はありませんか? – Adam

+0

ネットワークコールにRetrofitを使用している人が増えています。自動的に並行してコールをスケジュールします。これらのサービスを同期的に実行している場合は、各呼び出しに.subscribeOn(Schedulers.io())を追加します。 – kjones

+0

私はRetrofitを使用していません。私はJersey Clientを使用しています。過去に私は.observeOn(Schedulers.from(executorService))をチェーンの最後に追加しました。 .subscribeOnとどのように違いますか? – Adam

関連する問題