私はBlocking ConsumerをReactor Aluminum-SR1のFluxサブスクライバとして統合しようとしています。私は並列のスケジューラを使用してブロック操作を同時に実行したいと思います。FluxのpublishOnが期待どおりに動作しない
私は私の意図を記述するために、メインクラスを実装しました:
package etienne.peiniau;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
public class Main {
public static void main(String[] args) throws InterruptedException {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.elapsed()
.publishOn(Schedulers.parallel())
.subscribe(new Subscriber<Tuple2<Long, Integer>>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Tuple2<Long, Integer> t2) {
System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
try {
Thread.sleep(1000); // long operation
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("[" + Thread.currentThread().getName() + "] Complete");
}
});
// Waiting for the program to complete
System.out.println("[" + Thread.currentThread().getName() + "] Main");
Thread.sleep(100000);
}
}
このコードの出力は以下の通りです:
[main] Subscription
[main] Main
[parallel-1] [3,1]
[parallel-1] [1000,2]
[parallel-1] [1001,3]
[parallel-1] [1000,4]
[parallel-1] [1000,5]
[parallel-1] [1000,6]
[parallel-1] [1001,7]
[parallel-1] [1000,8]
[parallel-1] [1000,9]
[parallel-1] [1000,10]
[parallel-1] [1000,11]
[parallel-1] [1001,12]
[parallel-1] [1000,13]
[parallel-1] [1000,14]
[parallel-1] [1000,15]
[parallel-1] [1000,16]
[parallel-1] [1001,17]
[parallel-1] [1000,18]
[parallel-1] [1000,19]
[parallel-1] [1000,20]
[parallel-1] Complete
私の問題は、長い操作は常にオンに実行されていることですスレッドはパラレル1で1秒ごとに実行されます。
私は並列処理を手動で増やすか、弾性スケジューラを使用しようとしましたが、結果は同じです。
私はpublishOnメソッドがこのユースケースのために特別に設計されたと考えていました。私が何かを誤解したかどうか教えていただけますか?
@etiennepeiniauと
またはその他のオーバーロードされたメソッド私は単純なFluxでは "並列==もっとスレッド"ではないと思います。私は私の答えを編集し、例を追加します。 –