2017-04-19 18 views
0

プロデューサ(APIリクエスト)がイベントをあまりに速く放出していて、コンシューマ(APIレスポンス)がAPIリクエストの2番目のレスポンスしか取得していないときにRxJavaに奇妙な問題が発生しています。Rxjava - hot observables

たとえば、サーバーからのクエリの一部に時間がかかるため、サーバーに送信してデータベースにクエリを送信する必要があるとします。だから、私が応答を受け取ったとき、第2の要求が最初に返される可能性があり、奇妙なことは時々私が最初の要求の応答を得ないことである。

API呼び出しを行うコード:

public void sendRequests() { 
    // using RxJava to make server polling. 
    startPollingServer(); 
} 

startPollingServer()メソッドは、サーバーからデータをポーリングするRxJavaを使用することです。

public void startPollingServer() { 
    mApiService.getPollingFromServer() 
     .retryWhen() 
     .repeatWhen() 
     .map() 
     .subscribeOn() 
     .observeOn() 
     .subscribe(
      // call onNext 
      // call onError 
     ) 
} 

しかし、ときはsendRequests()onNext、onCompleteののどちらも、あまりにも速く呼び出している、のonErrorと呼ばれています。最初の要求に対して何も起こりません。 しかし私はCharlesProxyからの最初の応答を得ています。これは本当に奇妙です。

私の質問は、RxJavaが最初の応答を無視する可能性がありますか? startPollingServer()から来るObservablesをマージする必要がありますか?

+0

Androidアプリケーションやテストで使用しますか?なぜなら、呼び出し元のスレッドがサブスクライブし、メソッドstartPollingServerを経由するだけなので、テストの場合は、スレッドが終了するからです。 voidメソッドをサブスクライブするだけではお勧めできません。サブスクリプションを廃棄しないと、メモリリークが発生します。 startPollingメソッドはObservable <>を返し、1か所で作成します。 –

+0

私はアンドロイドアプリケーションでsendRequests()を使用していますが、ユーザがボタンやUIを非常に速く押すとシミュレートしています。それはちょうどいくつかのコードスニペットを示して、私はアプリケーションが破壊されたときにサブスクリプション購読を処理するコードを持っているので、メモリリークはありません – Cheng

+0

なぜそれはRxJavaの問題です、確かにサーバーは本当にそれぞれのケースで応答を返しますか?Observablesの独立したストリームがここにあるようです。 – yosriz

答えて

0

私の2年以上のRxJavaの経験によると、データが失われる可能性は非常に低いです。 物事次の点を考慮

エラーはエラーの抑制につながる可能性がありretryWhenとrepeatWhenを使用して

を抑制され、それらをキャッチする ')(のonError' 演算子を追加してみてください:

public void startPollingServer() { 
    mApiService.getPollingFromServer() 
     .doOnError(throwable -> log.error("Got an error", throwable)) // catch error 
     .retryWhen() 
     .repeatWhen() 
     .map() 
     .subscribeOn() 
     .observeOn() 
     .subscribe(
      // call onNext 
      // call onError 
     ) 
} 

http://reactivex.io/RxJava/javadoc/rx/Observable.html#doOnError(rx.functions.Action1)

正しくない並列性

Observable.just(1,2,3) 
.flatMap(i -> doNetworkCall(i)) 
.first() 

このようなコードは、「doNetworkCall(i)」の並列実行をもたらす可能性があります。この場合、2番目の応答が最初より速くなる場合があります。保証し、ストリームの順序が処理され、代わりに 'concatMap' を使用し、検証する:http://reactivex.io/documentation/operators/concat.html

デバッグを

演算子 'doOnNext()'、 'doOnSubscribe()'、 'doOnCompleted()' と ' doOnError() 'はエラーを見つけるのに役立ちます

Observable.just(1,2,3) 
.flatMap(i -> doNetworkCall(i) 
      .doOnSubscribe(() -> log.debug("Launched {}", i)) 
      .doOnNext(response -> log.debug("Got response {} for {}", response, i)) 
      .doOnError(throwable -> log.error("For error for " + i, throwable)) 
      .doOnComplete(() -> log.info("Finished processing of {}", i)) 
) 
.first() 
関連する問題