publishOn()
(またはRxJavaの場合はobserveOn()
)演算子がスレッドアフィニティの観点からどのように機能するかはっきり分かりません。私は、任意の加入者が同じスレッドで処理されることを、このオペレータが保証することを思ったが、例えば、以下の私の理解壊れている:私は次の出力を参照結果オペレータ "publishOn()"とスレッドアフィニティプロジェクトで
Flux<String> started = ep.publishOn(scheduler);
Flux<String> afterStateUpdate = started.doOnNext(e -> {
System.out.println("STATE UPDATE : " + Thread.currentThread().getId());
state.add(e);
}).share();
afterStateUpdate.subscribe();
ep.onNext("1");
ep.onNext("2");
ep.onNext("3");
afterStateUpdate.subscribe(e -> {
System.out.println("MAIN SUBSCRIBER : " + Thread.currentThread().getId());
});
ep.onNext("4");
ep.onNext("5");
ep.onNext("6");
を:
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
をそれは "ことを意味し状態更新プログラム "がスレッド12で動作していましたが、2番目のサブスクライバ「状態更新プログラム」がスレッド13で動作するようになったときに発生します。
したがって、このケースではスレッドの親和性を保証できますか?
あなたは正確にどのプロセッサ(プロセッサを使用しているようです)とどのスケジューラを使用していますか? (私は 'DirectProcessor'と' Schedulers.single() 'を想定していますが、ただの場合) –
EmitterProcessorを使用しています。 – dmitrievanthony
このようなスケジューラー 'Schedulers.fromExecutor(Executors.newFixedThreadPool(10))' – dmitrievanthony