2016-04-11 22 views
0

新しい値が前の値と異なっている場合にのみ、(直前に-1で始まる) 。さらに、新しい加入者に最新の価値を即座に発信したいと考えています。私がいないときしかし、これは、興味深いことにjava.lang.IllegalStateException: more produced than requested で新しい加入者に放出される(常に-1にかかわらず、hotObservableobservableに加入する前に発するものの)最初の値の後に失敗した`replay`と` autoConnect`を使用しているときに "要求より多く生成されました"という例外が発生しました

PublishSubject<Integer> hotObservable = PublishSubject.create(); 

Observable<Integer> observable = hotObservable 
     .startWith(-1) 
     .distinctUntilChanged() 
     .replay(1) 
     .autoConnect(0); 

:私は、次のコードを作ってみました

Observable<Integer> observable = hotObservable 
     .startWith(-1) 
     .distinctUntilChanged() 
     .replay(1) 
     .autoConnect(); 

observable.subscribe().unsubscribe(); 

次のサブスクライバは正常に動作し、最後の値を受信して​​更新します。

私はreplay(1).autoConnect(0)を働かせることができません。私は何かが恋しくなるように感じます。なぜ、autoConnect(0)は、購読したり辞退したりするのでしょうか?そのような観察可能なものを作る正しい方法は何ですか?

ここで私はautoConnect(); observable.subscribe().unsubscribe()を使用しない限り、失敗した試験方法があります:

Observable<Integer> observable = hotObservable 
     .startWith(-1) 
     .distinctUntilChanged() 
     .replay(1) 
     .autoConnect(); // With (0) it fails 

observable.subscribe().unsubscribe(); // Needed if we don't auto connnect 

hotObservable.onNext(1); 
hotObservable.onNext(2); 
hotObservable.onNext(3); // I want this value to be received by new subscriber 

TestSubscriber<Integer> subscriber = TestSubscriber.create(); 
observable.subscribe(subscriber); 

subscriber.assertNoErrors(); 
subscriber.assertValues(3); 

答えて

1

私はRxJava 1.1.3で上記のコードでMore produced than requestedエラーを得ることはありません。

アサーションが失敗する理由は、replayは、そのサブスクライバのいずれかが実際に要求するまで、アップストリームから何も要求しないということです。 TestSubscriberが初めて購読する場合は、startWithが-1を出してからPublishSubjectに切り替えます。このPublishSubjectは値を保持しないので、何も受信しません。

BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1); 

Observable<Integer> observable = hotObservable.distinctUntilChanged(); 

hotObservable.onNext(1); 
hotObservable.onNext(2); 
hotObservable.onNext(3); 

TestSubscriber<Integer> subscriber = TestSubscriber.create(); 
observable.subscribe(subscriber); 

subscriber.assertNoErrors(); 
subscriber.assertValue(3); 
+0

1.1.3'が働く 'バージョンを確認することができます:

は、私はあなたが非常に最後の値を保持し、新規加入者のためのもので始まるBehaviorSubjectで探しているものと信じて!以前は「1.1.2」だった。 'BehaviorSubject'については興味深いですが、実際には' hotObservable'は他の場所で作成されています。 'BehaviorSubject b = BehaviorSubject.create(-1)です。 hotObservable.subscribe(b); return b.distinctUntilChanged() '有効なアプローチですか? – wasyl

+0

はい、有効なアプローチです。 hotObservableシグナルの値があまりにも速い場合は、 'onBackpressureBuffer'または' onBackpressureDrop'を追加するとよいでしょう。 – akarnokd

+0

私は 'onBackpressureLatest()'を追加しました。あなたの助けをありがとう! – wasyl

関連する問題