rxjavaマージの仕組みを理解しようとしています。ので、ここでIは、加入者に結果をマージするRxJavaマージサブスクライバは、最初に観測可能な結果のみを取得します。
のようであろうと期待された2回の観測の結果をマージしObservable.merge(getObservable(), getTimedObservable()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(final String s) { Log.i("test", s); } }); private Observable<String> getTimedObservable() { return Observable.interval(150, TimeUnit.MILLISECONDS) .map(new Func1<Long, String>() { @Override public String call(final Long aLong) { Log.i("test", "tick thread: " + Thread.currentThread().getId()); return String.valueOf(aLong); } }); } public Observable<String> getObservable() { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> subscriber) { try { Log.i("test", "simple observable thread: " + Thread.currentThread().getId()); for (int i = 1; i <= 10; i++) { subscriber.onNext(String.valueOf(i * 100)); Thread.sleep(300); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }); }
を加入者発するべき簡単なコードであります
またはそのような何か、しかし、実際の結果は次のとおりです。
test: simple observable thread: 257
test: 100
test: 200
test: 300
test: 400
test: 500
test: 600
test: 700
test: 800
test: 900
test: 1000
test: tick thread: 254
test: 0
test: tick thread: 254
test: 1
test: tick thread: 254
test: 2
test: tick thread: 254
test: 3
test: tick thread: 254
test: 4
test: tick thread: 254
test: 5
test: tick thread: 254
test: 6
test: tick thread: 254
test: 7
test: tick thread: 254
test: 8
test: tick thread: 254
test: 9
test: tick thread: 254
test: 10
test: tick thread: 254
test: 11
test: tick thread: 254
test: 12
test: tick thread: 254
test: 13
最初に観測可能なブロックでThread.sleepのように見えますが、どのように理解できません。誰か説明できますか?
:
Plaseはで編集された出力を見てみましょう同じスレッド?オブザーバブルでスレッドIDが異なるのはなぜですか? – orium
intervalのデフォルトのsubscribeOn-schedulerがRxJavaの計算スレッドプールであるため、スレッドが異なります。これにはデフォルトのスケジューラーがあります。したがって、値の生成が呼び出し元スレッドと同じスレッド上で行われると、デッドロックが発生します。 getObservableにはデフォルトスケジューラがありません。したがって、subscribeOnで指定されているように、io-schedulerでサブスクライブされます。それが購読されているので、io-threadpool上で値を生成し始めます。 Observable.createが生成を停止するまで、呼び出しスレッドはブロックされます。 getTimedObservableでgetObservableを切り替えると、並行性が得られます –