私はRxJavaに新しいですし、リンクから複数の観測のための並列実行の例を実行しようとしていた。 RxJava Fetching Observables In Parallelなぜ複数のRXJavaオブザーバブルで並列実行が行われないのですか?
上記のリンクで提供された例は、並行して観測を行っているが、私はスレッドを追加したとき。 sleep(TIME_IN_MILLISECONDS)forEachメソッドでは、システムはObservableを一度に1つずつ実行し始めました。 Thread.sleepがObservablesの並列実行を停止している理由を理解するのを助けてください。我々は、観察のsubscribeOn方法を使用して実行するためのThreadPool(Schedules.io)を提供している上記の例で
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
.forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){};
System.out.println(x + " " + Thread.currentThread().getId());});
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
:
以下は、複数の観測の同期実行を引き起こしている変形例でありますObservableごとのサブスクリプションは別のスレッドで行われます。
Thread.sleepがスレッド間で共有オブジェクトをロックしている可能性はありますが、それでもクリアされていません。助けてください。
ソースが並行して実行されなかったことをどのように知っていますか? – akarnokd
私はこれを知っています現在実行中のスレッドIDを印刷します。 –