2016-11-02 5 views
1

rxjavaマージの仕組みを理解しようとしています。ので、ここでIは、加入者に結果をマージするRxJavaマージサブスクライバは、最初に観測可能な結果のみを取得します。

のようであろうと期待された2回の観測の結果をマージし

Observable.merge(getObservable(), getTimedObservable()) 
       .subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Action1<String>() { 
        @Override public void call(final String s) { 
         Log.i("test", s); 
        } 
       }); 

    private Observable<String> getTimedObservable() { 
     return Observable.interval(150, TimeUnit.MILLISECONDS) 
       .map(new Func1<Long, String>() { 
        @Override public String call(final Long aLong) { 
         Log.i("test", "tick thread: " + Thread.currentThread().getId()); 
         return String.valueOf(aLong); 
        } 
       }); 
    } 

    public Observable<String> getObservable() { 
     return Observable.create(new Observable.OnSubscribe<String>() { 
      @Override public void call(final Subscriber<? super String> subscriber) { 
       try { 
        Log.i("test", "simple observable thread: " + Thread.currentThread().getId()); 
        for (int i = 1; i <= 10; i++) { 
         subscriber.onNext(String.valueOf(i * 100)); 
         Thread.sleep(300); 
        } 
        subscriber.onCompleted(); 
       } catch (Exception e) { 
        subscriber.onError(e); 
       } 
      } 
     }); 
    } 

を加入者発するべき簡単なコードであります

またはそのような何か、しかし、実際の結果は次のとおりです。

test: simple observable thread: 257 
test: 100 
test: 200 
test: 300 
test: 400 
test: 500 
test: 600 
test: 700 
test: 800 
test: 900 
test: 1000 
test: tick thread: 254 
test: 0 
test: tick thread: 254 
test: 1 
test: tick thread: 254 
test: 2 
test: tick thread: 254 
test: 3 
test: tick thread: 254 
test: 4 
test: tick thread: 254 
test: 5 
test: tick thread: 254 
test: 6 
test: tick thread: 254 
test: 7 
test: tick thread: 254 
test: 8 
test: tick thread: 254 
test: 9 
test: tick thread: 254 
test: 10 
test: tick thread: 254 
test: 11 
test: tick thread: 254 
test: 12 
test: tick thread: 254 
test: 13 

最初に観測可能なブロックでThread.sleepのように見えますが、どのように理解できません。誰か説明できますか?

答えて

3

mergeは両方の観測値を同時に購読します。最初にサブスクライブされるobservableは、呼び出しスレッドで値を生成します。呼び出しスレッドはobservable1によってブロックされるため、observable2は値を生成できません。 SubscribeOnは、登録が行われる場所のみを示します。観察可能な開始がmain-1で値を生成すると言うことができます。すべての下流の値は同じスレッドになります。同時実行は起こっていません。

同時実行性を維持したい場合は、各観測対象について、サブスクリプションを実行する必要がある場所を指定する必要があります。 2つの観測点を持つObservables.mergeがあるとしましょう。 Observable1とObservable2は、いくつかのThreadpoolでsubscribeOnを持っています。各observableは、subscribeOnの指定されたスレッドで値を生成します。あなたは同時性を達成しました。あなたは、私が `` .subscribeOn(Schedulers.ioを())書かれているに関係なく、上の観測をサブスクライブをマージすること、言いたい

@Test 
public void name() throws Exception { 
    Subscription subscribe = Observable.merge(getObservable(), getTimedObservable()) 
      //.observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(s -> { 

       System.out.println("subscription " + s); 
       //Log.i("test", s); 
      }); 


    Thread.sleep(5_000); 
} 

private Observable<String> getTimedObservable() { 
    return Observable.interval(150, TimeUnit.MILLISECONDS) 
      .map(aLong -> { 
       System.out.println("getTimedObservable: " + Thread.currentThread().getId()); 

       //Log.i("test", "tick thread: " + Thread.currentThread().getId()); 
       return String.valueOf(aLong); 
      }).subscribeOn(Schedulers.io()); 
} 

private Observable<String> getObservable() { 
    return Observable.<String>create(subscriber -> { 
     try { 
      for (int i = 1; i <= 10; i++) { 
       System.out.println("getObservable: " + Thread.currentThread().getId()); 
       subscriber.onNext(String.valueOf(i * 100)); 
       Thread.sleep(300); 
      } 
      subscriber.onCompleted(); 
     } catch (Exception e) { 
      subscriber.onError(e); 
     } 
    }).subscribeOn(Schedulers.io()); 
} 
+0

Plaseはで編集された出力を見てみましょう同じスレッド?オブザーバブルでスレッドIDが異なるのはなぜですか? – orium

+0

intervalのデフォルトのsubscribeOn-schedulerがRxJavaの計算スレッドプールであるため、スレッドが異なります。これにはデフォルトのスケジューラーがあります。したがって、値の生成が呼び出し元スレッドと同じスレッド上で行われると、デッドロックが発生します。 getObservableにはデフォルトスケジューラがありません。したがって、subscribeOnで指定されているように、io-schedulerでサブスクライブされます。それが購読されているので、io-threadpool上で値を生成し始めます。 Observable.createが生成を停止するまで、呼び出しスレッドはブロックされます。 getTimedObservableでgetObservableを切り替えると、並行性が得られます –

0

RxJavaはデフォルトでマルチスレッドではなく、すべて同じスレッドで実行されます。マルチスレッドが必要な場合は、スケジューラを使用する必要があります。

getObservable()の末尾に.subscribe(Subscribers.io())を追加します。

関連する問題