2017-06-19 2 views
0

私はRxJavaに新しいですし、リンクから複数の観測のための並列実行の例を実行しようとしていた。 RxJava Fetching Observables In Parallelなぜ複数のRXJavaオブザーバブルで並列実行が行われないのですか?

上記のリンクで提供された例は、並行して観測を行っているが、私はスレッドを追加したとき。 sleep(TIME_IN_MILLISECONDS)forEachメソッドでは、システムはObservableを一度に1つずつ実行し始めました。 Thread.sleepがObservablesの並列実行を停止している理由を理解するのを助けてください。我々は、観察のsubscribeOn方法を使用して実行するためのThreadPool(Schedules.io)を提供している上記の例で

import rx.Observable; 
import rx.Subscriber; 
import rx.schedulers.Schedulers; 

public class ParallelExecution { 

    public static void main(String[] args) { 
     System.out.println("------------ mergingAsync"); 
     mergingAsync(); 
    } 

    private static void mergingAsync() { 
     Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking() 
     .forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){}; 
     System.out.println(x + " " + Thread.currentThread().getId());}); 
    } 

    // artificial representations of IO work 
    static Observable<Integer> getDataAsync(int i) { 
     return getDataSync(i).subscribeOn(Schedulers.io()); 
    } 

    static Observable<Integer> getDataSync(int i) { 
     return Observable.create((Subscriber<? super Integer> s) -> { 
      // simulate latency 
      try { 
       Thread.sleep(1000); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
      s.onNext(i); 
      s.onCompleted(); 
     }); 
    } 
} 

以下

は、複数の観測の同期実行を引き起こしている変形例でありますObservableごとのサブスクリプションは別のスレッドで行われます。

Thread.sleepがスレッド間で共有オブジェクトをロックしている可能性はありますが、それでもクリアされていません。助けてください。

+0

ソースが並行して実行されなかったことをどのように知っていますか? – akarnokd

+0

私はこれを知っています現在実行中のスレッドIDを印刷します。 –

答えて

2

実際に、あなたの例では、パラレル実行が起こっていますが、あなたはそれを間違って見ているだけで、作業が実行される場所と通知が発行される場所に違いがあります。

スレッドIDがObservable.createのログを保存すると、Observableが別のスレッドで同時に実行されていることがわかります。通知は連続して発生します。 Observableコントラクトの一部として期待されているように、オブザーバブルはオブザーバにシリアルに(パラレルではなく)通知を発行する必要があります。

関連する問題