2
私は理解できない非常に奇妙なRxJavaの動作に直面しています。RxJavaスケジューラはスリープ状態のスレッドを変更しません
私は要素の処理を並列化したいとします。出力はかなり予測可能である
public static void log(String msg) {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("%s - %s", threadName, msg));
}
public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.create(s -> {
while (true) {
log("start");
s.onNext(Math.random());
sleep(10);
}
}).subscribeOn(sA)
.flatMap(r -> Observable.just(r).subscribeOn(sB))
.doOnNext(r -> log("process"))
.subscribe((r) -> log("finish"));
}
:私はそのためflatMapを使用してい
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-2 - process
pool-2-thread-2 - finish
pool-1-thread-1 - start
pool-2-thread-3 - process
pool-2-thread-3 - finish
まあOK、しかしflatMapの並列化スケジューラがスレッドを変更停止した後、私はマップにN> 10で睡眠を追加した場合。以下与えるもの
public static void main(String[] args) throws InterruptedException {
Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.create(s -> {
while (true) {
log("start");
s.onNext(Math.random());
sleep(10);
}
}).subscribeOn(sA)
.flatMap(r -> Observable.just(r).subscribeOn(sB))
.doOnNext(r -> sleep(15))
.doOnNext(r -> log("process"))
.subscribe((r) -> log("finish"));
}
:
pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-1 - process
WHY?なぜ、すべての要素がflatMapの後に同じスレッド(pool-2-thread-1)で処理されているのですか?
ありがとうございます。そしてなぜ私はこのスレッドに睡眠なしで覗き見ていないのですか? – corvax
ソース間に非決定的なエミッション競合があり、一部のスレッドは他のスレッドの要素も放出する可能性があります。 – akarnokd