2016-05-03 12 views
3

私はこれまでRxJavaを使っていましたが、反応ストリーム仕様に準拠しているため、projectreactor.ioからreactor-coreで遊び始めています。reactor-core - java.lang.IllegalStateException:キューがいっぱいですか? on Hot Publisher(ConnectableFlux)

次のテストでは、乱数を生成するホットFlux(ConnectableFlux)を作成します。私はそれをすぐにconnect()し、256の値を先読みします(実際にはログに258が表示されます)。私は5秒待って、サブスクライバが後でしばらくするまでサブスクライブしないことをシミュレートします。

メインスレッドが起動すると、RnAppはConnectableFlux、randomNumberGenerator.subscribe(new RnApp());にサブスクライブします。次にRnApp.onSubscribe()が呼び出され、10個の要素が要求されます。その後、java.lang.IllegalStateException: Queue full?!例外が発生します(RnApp.onError()が呼び出されます)。なぜですか?

加入者

public class RnApp implements Subscriber<Float>{ 

    private Subscription subscription; 
    private List<Float> randomNumbers = new ArrayList<Float>(); 

    @Override 
    public void onComplete() { 
     System.out.println("onComplete"); 
    } 

    @Override 
    public void onError(Throwable err) { 
     err.printStackTrace(); 
    } 

    @Override 
    public void onNext(Float f) { 
     if(this.randomNumbers.size()>=10){ 
      this.subscription.cancel(); 
     }else{ 
      this.randomNumbers.add(f); 
     } 
    } 

    @Override 
    public void onSubscribe(Subscription subs) { 
     this.subscription = subs; 
     this.subscription.request(10); 
    } 
} 

出版社のテスト

@Test 
public void randomNumberReading() throws InterruptedException { 

    CountDownLatch latch = new CountDownLatch(1); 
    ConnectableFlux<Float> randomNumberGenerator = ConnectableFlux.<Float>create((c) -> { 
     SecureRandom sr = new SecureRandom(); 
     int i = 1; 
     while(true){ 
      try { 
       Thread.sleep(10); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
      System.out.println("-----------------------------------------------------"+(i++)); 
      c.onNext(sr.nextFloat()); 
     } 
    }).log().subscribeOn(Computations.concurrent()).publish(); 

    randomNumberGenerator.connect(); 

    Thread.sleep(5000); 

    randomNumberGenerator.subscribe(new RnApp()); 

    latch.await(); 
} 

ログイン:RxJavaと同様に

11:12:05.125 [main] DEBUG r.core.util.Logger$LoggerFactory - Using Slf4j logging framework 
11:12:05.363 [concurrent-1] INFO reactor.core.publisher.FluxLog - onSubscribe(io.pivotal.literx.Part10SubscribeOnPublishOn$$Lambda$1/[email protected]) 
11:12:05.371 [concurrent-1] INFO reactor.core.publisher.FluxLog - request(256) 
-----------------------------------------------------1 
11:12:06.000 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.39189225) 
-----------------------------------------------------2 
... 
-----------------------------------------------------257 
11:12:08.683 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.34729618) 
-----------------------------------------------------258 
11:12:08.697 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.7729547) 
java.lang.IllegalStateException: Queue full?! 
    at reactor.core.publisher.FluxPublish$State.onNext(FluxPublish.java:246) 
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onNext(FluxSubscribeOn.java:134) 
    at reactor.core.publisher.FluxLog$LoggerBarrier.doNext(FluxLog.java:130) 
    at reactor.core.subscriber.SubscriberBarrier.onNext(SubscriberBarrier.java:85) 
    at reactor.core.subscriber.SubscriberWithContext.onNext(SubscriberWithContext.java:92) 
    at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:132) 
    at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:145) 
    at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:114) 
    at reactor.core.publisher.FluxGenerate$SubscriberProxy.request(FluxGenerate.java:245) 
    at reactor.core.subscriber.SubscriberBarrier.doRequest(SubscriberBarrier.java:146) 
    at reactor.core.publisher.FluxLog$LoggerBarrier.doRequest(FluxLog.java:160) 
    at reactor.core.subscriber.SubscriberBarrier.request(SubscriberBarrier.java:135) 
    at reactor.core.util.DeferredSubscription.set(DeferredSubscription.java:71) 
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onSubscribe(FluxSubscribeOn.java:129) 
    at reactor.core.publisher.FluxLog$LoggerBarrier.doOnSubscribe(FluxLog.java:122) 
    at reactor.core.subscriber.SubscriberBarrier.onSubscribe(SubscriberBarrier.java:67) 
    at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:72) 
    at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:67) 
    at reactor.core.publisher.FluxSubscribeOn$SourceSubscribeTask.run(FluxSubscribeOn.java:363) 
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:919) 
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:883) 
    at reactor.core.publisher.WorkQueueProcessor$QueueSubscriberLoop.run(WorkQueueProcessor.java:842) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

答えて

4

、あなたは、AR場合e create()を使用すると、あなたはキャンセルとバックプレッシャーを自分で処理しています。代わりに、標準の演算子から発電機を構築することができ:

ConnectableFlux<Double> secureRandomFlux = Flux.using(
    () -> new SecureRandom(), 
    sr -> Flux.interval(10, TimeUnit.MILLISECONDS) 
      .map(v -> sr.nextDouble()) 
      .onBackpressureDrop() 
    sr -> { } 
).publish(); 
+0

ありがとうございます。 Flux.using states: '各サブスクライバごとにサプライヤによって生成されたリソースを使用する.'これは、フラックスが異なるものを提供することを意味するのでしょうか?各サブスクライバのランダムな値?2)元のケースでは、create()を使用してバックプレッシャをどのように処理できますか? – codependent

+0

同時にsecureRandomFluxを聞いている人は、同じ正確な値を取得します。これはあなたの意図として現れます。サブスクライバのリクエスト量を追跡し、値を受け取る準備ができているかどうかを確認する必要があります。 – akarnokd

+0

Everythinngが解決しました:-)最後の1つです。ホット・オブザーバブルに関しては、この場合、プリフェッチ値は気にしませんが、リアルタイム・ソース(センサーなど)の場合は、何もプリフェッチしたくありません。しかし、私たちは 'publish(n)'に値> = 1を設定しなければなりません。あなたは先読みしない方法を知っていますか?御時間ありがとうございます。 – codependent

3

create()は、コールバックinvokationにつき1 onNextを期待しています。または、Flux.yieldをチェックすると、下流の状態(バックプレッシャまたはキャンセル)を処理するための特別な発光方法が得られます。または、createのようなFlux.generateを使用することもできますが、要求ごとに1回呼び出されるので、渡された要求に効果的にonNextすることができます。

これらの3つのFluxジェネレータは現在議論されているので、http://github.com/reactor/reactor-core/issuesでより良い代替案を検討することができます。

また、Flux#delaySubscriptionを使用して公開プリフェッチを防止することもできます。最新のスナップショットUnicastProcessor:

UnicastProcessor<Object> p = UnicastProcessor.create(); 
flux.delaySubscription(p).publish(128).autoConnect().subscribe(); 

//... 
p.onNext(new Object()); 
関連する問題