私はこれまでRxJavaを使っていましたが、反応ストリーム仕様に準拠しているため、projectreactor.ioからreactor-coreで遊び始めています。reactor-core - java.lang.IllegalStateException:キューがいっぱいですか? on Hot Publisher(ConnectableFlux)
次のテストでは、乱数を生成するホットFlux(ConnectableFlux)を作成します。私はそれをすぐにconnect()し、256の値を先読みします(実際にはログに258が表示されます)。私は5秒待って、サブスクライバが後でしばらくするまでサブスクライブしないことをシミュレートします。
メインスレッドが起動すると、RnAppはConnectableFlux、randomNumberGenerator.subscribe(new RnApp());
にサブスクライブします。次にRnApp.onSubscribe()
が呼び出され、10個の要素が要求されます。その後、java.lang.IllegalStateException: Queue full?!
例外が発生します(RnApp.onError()
が呼び出されます)。なぜですか?
加入者:
public class RnApp implements Subscriber<Float>{
private Subscription subscription;
private List<Float> randomNumbers = new ArrayList<Float>();
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable err) {
err.printStackTrace();
}
@Override
public void onNext(Float f) {
if(this.randomNumbers.size()>=10){
this.subscription.cancel();
}else{
this.randomNumbers.add(f);
}
}
@Override
public void onSubscribe(Subscription subs) {
this.subscription = subs;
this.subscription.request(10);
}
}
出版社のテスト:
@Test
public void randomNumberReading() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ConnectableFlux<Float> randomNumberGenerator = ConnectableFlux.<Float>create((c) -> {
SecureRandom sr = new SecureRandom();
int i = 1;
while(true){
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-----------------------------------------------------"+(i++));
c.onNext(sr.nextFloat());
}
}).log().subscribeOn(Computations.concurrent()).publish();
randomNumberGenerator.connect();
Thread.sleep(5000);
randomNumberGenerator.subscribe(new RnApp());
latch.await();
}
ログイン:RxJavaと同様に
11:12:05.125 [main] DEBUG r.core.util.Logger$LoggerFactory - Using Slf4j logging framework
11:12:05.363 [concurrent-1] INFO reactor.core.publisher.FluxLog - onSubscribe(io.pivotal.literx.Part10SubscribeOnPublishOn$$Lambda$1/[email protected])
11:12:05.371 [concurrent-1] INFO reactor.core.publisher.FluxLog - request(256)
-----------------------------------------------------1
11:12:06.000 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.39189225)
-----------------------------------------------------2
...
-----------------------------------------------------257
11:12:08.683 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.34729618)
-----------------------------------------------------258
11:12:08.697 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.7729547)
java.lang.IllegalStateException: Queue full?!
at reactor.core.publisher.FluxPublish$State.onNext(FluxPublish.java:246)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onNext(FluxSubscribeOn.java:134)
at reactor.core.publisher.FluxLog$LoggerBarrier.doNext(FluxLog.java:130)
at reactor.core.subscriber.SubscriberBarrier.onNext(SubscriberBarrier.java:85)
at reactor.core.subscriber.SubscriberWithContext.onNext(SubscriberWithContext.java:92)
at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:132)
at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:145)
at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:114)
at reactor.core.publisher.FluxGenerate$SubscriberProxy.request(FluxGenerate.java:245)
at reactor.core.subscriber.SubscriberBarrier.doRequest(SubscriberBarrier.java:146)
at reactor.core.publisher.FluxLog$LoggerBarrier.doRequest(FluxLog.java:160)
at reactor.core.subscriber.SubscriberBarrier.request(SubscriberBarrier.java:135)
at reactor.core.util.DeferredSubscription.set(DeferredSubscription.java:71)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onSubscribe(FluxSubscribeOn.java:129)
at reactor.core.publisher.FluxLog$LoggerBarrier.doOnSubscribe(FluxLog.java:122)
at reactor.core.subscriber.SubscriberBarrier.onSubscribe(SubscriberBarrier.java:67)
at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:72)
at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:67)
at reactor.core.publisher.FluxSubscribeOn$SourceSubscribeTask.run(FluxSubscribeOn.java:363)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:919)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:883)
at reactor.core.publisher.WorkQueueProcessor$QueueSubscriberLoop.run(WorkQueueProcessor.java:842)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
ありがとうございます。 Flux.using states: '各サブスクライバごとにサプライヤによって生成されたリソースを使用する.'これは、フラックスが異なるものを提供することを意味するのでしょうか?各サブスクライバのランダムな値?2)元のケースでは、create()を使用してバックプレッシャをどのように処理できますか? – codependent
同時にsecureRandomFluxを聞いている人は、同じ正確な値を取得します。これはあなたの意図として現れます。サブスクライバのリクエスト量を追跡し、値を受け取る準備ができているかどうかを確認する必要があります。 – akarnokd
Everythinngが解決しました:-)最後の1つです。ホット・オブザーバブルに関しては、この場合、プリフェッチ値は気にしませんが、リアルタイム・ソース(センサーなど)の場合は、何もプリフェッチしたくありません。しかし、私たちは 'publish(n)'に値> = 1を設定しなければなりません。あなたは先読みしない方法を知っていますか?御時間ありがとうございます。 – codependent