2017-08-17 11 views
0

RxJavaとリアクティブプログラミングが新しくなりました。Rx Javaマップ関数を並列に実行する

私は2つの関数を1つのObservableパイプラインの一部として並列にマップしようとしていますが、このようには動作しません。ここに私のコードです。

Observable.fromCallable(thatReturnsNumberOne()) 
       .observeOn(newThread()) 
       .map(doubleIt()) 
       .observeOn(newThread()) 
       .map(doubleIt()) 
       .subscribe(testSubscriber); 

私は2つのdoubleIt()呼び出しを同時に生成したいと思います。しかし、最初のdoubleIt()が終了すると、2つ目のdoubleIt()が開始されます。すなわち、ブロッキング/シーケンシャル。

私には何が欠けていますか?

答えて

1

私はthatReturnsNumberOne()が単一の値を返すと仮定しています。返される値は、順番に各演算子に渡されます。 observeOn(newThread())を使用すると、値がチェーン内のそのポイントに到達したときに新しいスレッドに変更するだけです。

あなたが並列に計算を行いたい場合は、複数の観測を使用する必要があります。

Observable.fromCallable(thatReturnsNumberOne()) 
    .flatMap(number -> Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()) 
     .combineLatest(Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()), 
     doubles -> doubles[0] + doubles[1])) 
    .subscribe(testSubscriber); 
関連する問題