2017-09-14 3 views
0

私はAPI呼び出しを行うためのrxメソッドを持っていますが、このメソッドの呼び出し元は非常に短時間で複数回発生する可能性があります。だから、RXメソッドは、呼び出し元のメソッドは、これは短時間で複数回apiCallWithRx呼び出すことができますrxjavaはダウンストリームの結果をドロップします

public void apiCallWithRx() { 
    apiService.makeApiCall() 
     .subscribeOn(Schecdulars.io()) 
     .observeOn(AndroidSchedulers.mainTread()) 
     .subscribe(
      // onNext 
      new onConsume(), 
      // onError 
      new onConsume(), 

     ); 
} 

.. だろうしかし、問題は、私は時々、二度目から呼び出すときdowntreamからの応答を取得couldntの、または任意の特定のことです時間。 onNext、onError、onCompleteのいずれも呼び出されません。 rxjava1とrxjava2を試してみましたが、それらは同じです。

アドバイスをいただければ幸いです。

UPDATE 1

私は、任意の背圧例外を参照してくださいdidntの、それは背圧問題になるcouldntの。

UPDATE 2

Rxのコードは、ほとんどの時間の作品、細部を無視してください。私はちょうど私が、バックグラウンドでのBlockingQueueを持っているイラストの目的

UPDATE 3

のためのいくつかのコードを省略し、そのキューで利用可能なデータがあるときに、このRXメソッドが実際に呼び出されます。データはいつでもキューに追加できます。 このrxメソッドはではなく、という非同期にと呼ばれます。このメソッドは最初の応答の後にのみ呼び出され、キューがある場合はデータをチェックしてから2番目のapi要求を送信します。

+0

を別の例を見ることができます

チェックこのテスト

@Test public void testObservableAsync() throws InterruptedException { Subscription subscription = Observable.from(numbers) .doOnNext(increaseTotalItemsEmitted()) .subscribeOn(Schedulers.newThread()) .subscribe(number -> System.out.println("Items emitted:" + total)); System.out.println("I finish before the observable finish. Items emitted:" + total); new TestSubscriber((Observer) subscription) .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); } 

は、デッドロックが発生しやすいです。ダウンストリームがデータを消費するまで、データをバッファリングするためにUnicastSubjectが必要になる可能性があります。 – akarnokd

+0

@akamokd apiCallWithRx()メソッドはアンドロイドのUIスレッドから呼び出されるため、ExecutorServiceはバックグラウンドスレッドでBlockingQueueからデータを取得し、UIスレッドに渡され、UIスレッドでapiCallWithRx()メソッドがトリガされます。また、前回のリクエストデータに対してサーバからのapiレスポンスを取得するたびに、次のリクエストデータを取得するためにブロッキングキューをチェックします。だから、API呼び出しとBlockingQueueはかなり分かれていますが、ここでデッドロックがあるとは思わない – Cheng

答えて

0

Observableはデフォルトでは怠惰であり、Observableを実行するにはsubscribeにする必要があります。

+0

ええ、私たちはそのコードを、説明のために省略しています。 – Cheng

+0

ちょうど、感謝を追加しました – Cheng

+0

あなたのコードは、observableをトリガーするすべてのイベントを無視する 'subscribe'のバージョンを使用します。 –

0

私はRxJava2 subscribeOnobserverOnにオペレータは、それが実行されなければならないスレッドを指定せずに使用することができる理由を知りません。

少なくともRxJava1では、これらの演算子を使用すると、非同期に実行し、結果を得るために実行を待たなければなりません。あなたがRxJavaでのBlockingQueueを使用して、ここでhttps://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

+0

質問が更新されました。私はBlockingQueueを使用して、rxメソッドが連続して呼び出されるようにします – Cheng

関連する問題