2

publishOn()(またはRxJavaの場合はobserveOn())演算子がスレッドアフィニティの観点からどのように機能するかはっきり分かりません。私は、任意の加入者が同じスレッドで処理されることを、このオペレータが保証することを思ったが、例えば、以下の私の理解壊れている:私は次の出力を参照結果オペレータ "publishOn()"とスレッドアフィニティプロジェクトで

Flux<String> started = ep.publishOn(scheduler); 
Flux<String> afterStateUpdate = started.doOnNext(e -> { 
    System.out.println("STATE UPDATE : " + Thread.currentThread().getId()); 
    state.add(e); 
}).share(); 
afterStateUpdate.subscribe(); 
ep.onNext("1"); 
ep.onNext("2"); 
ep.onNext("3"); 

afterStateUpdate.subscribe(e -> { 
    System.out.println("MAIN SUBSCRIBER : " + Thread.currentThread().getId()); 
}); 
ep.onNext("4"); 
ep.onNext("5"); 
ep.onNext("6"); 

を:

STATE UPDATE : 12 
STATE UPDATE : 12 
STATE UPDATE : 12 
STATE UPDATE : 13 
MAIN SUBSCRIBER : 13 
STATE UPDATE : 13 
MAIN SUBSCRIBER : 13 
STATE UPDATE : 13 
MAIN SUBSCRIBER : 13 

をそれは "ことを意味し状態更新プログラム "がスレッド12で動作していましたが、2番目のサブスクライバ「状態更新プログラム」がスレッド13で動作するようになったときに発生します。

したがって、このケースではスレッドの親和性を保証できますか?

+0

あなたは正確にどのプロセッサ(プロセッサを使用しているようです)とどのスケジューラを使用していますか? (私は 'DirectProcessor'と' Schedulers.single() 'を想定していますが、ただの場合) –

+0

EmitterProcessorを使用しています。 – dmitrievanthony

+0

このようなスケジューラー 'Schedulers.fromExecutor(Executors.newFixedThreadPool(10))' – dmitrievanthony

答えて

0

1人の加入者が同じスレッドを使用していることを100%確信する方法が1つしか見つかりませんでした。私を修正してください - 誰でもスレッドの親和性を提供するために、他の方法を知っていればwrapIntoSingleThread(schedulers[i++ % schedulers.length]).subscribe(...)

Scheduler[] schedulers = new Schedulers[10]; 
for (int i = 0; i < 10; i++) { 
    schedulers[i] = Schedulers.newSingle(); 
} 

public Flux<T> wrapIntoSingleThread(Scheduler scheduler) { 
    return this.flux.publishOn(scheduler); 
} 

そして、すべての加入者のために、このプールから1つのスケジューラを取る:それはこのように、単一のスケジューラのプールを作成することです。

関連する問題