私は、次のコードを持っている:RxJava:観察可能およびデフォルトのスレッド
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
重要詳細:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
そして、ここでは出力です私は別のスレッド(main
スレッドからsubscribe
メソッドを呼んでいますAndroidで)。
Observable
クラスは、同期、デフォルトであり、それは右、イベント(s.onNext
)を発する同じスレッド上のすべて(map
+通知する加入者のような演算子を)実行のようにそうに見えますか?私は疑問に思います...それは意図された行動ですか、私は何か誤解しましたか?実際には、私は少なくともonNext
とonComplete
のコールバックが発信イベントではなく呼び出し元のスレッドで呼び出されると予想していました。この特定のケースでは、実際の発信者のスレッドは問題ではないことを正しく理解していますか?少なくともイベントが非同期的に生成されるとき。
もう1つの懸念事項 - Observableを外部ソースからパラメータとして受け取った場合(つまり、私自身は生成しません)、そのユーザーが自分であるかどうかを確認する方法はありません同期または非同期で、私はsubscribeOn
とobserveOn
の方法でコールバックを受け取る場所を明示的に指定するだけです。
ありがとうございます!