RxJavaが本当に好きです。それは素晴らしいツールですが、時にはそれがどのように動作するのか非常に難しいです。 私たちはAndroidプロジェクトでRxJavaでRetrofitを使用していますが、次のユースケースがあります。"skipWhile"とRxJavaの "repeatWhen"を組み合わせてサーバーポーリングを実装する
サーバーが何らかの仕事をしているうちに、再試行の間に若干の遅延を伴ってサーバーをポーリングする必要があります。サーバーが終了したら、結果を提供する必要があります。私は成功しRxJavaでそれをやったので、ここでのコードスニペットは、次のとおりです。 私はコードが正常に動作します
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
.skipWhile(new Func1<CheckJobResponse, Boolean>() {
@Override
public Boolean call(CheckJobResponse checkJobResponse) {
boolean shouldSkip = false;
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());
switch (checkJobResponse.getJobStatus()){
case CheckJobResponse.PROCESSING:
shouldSkip = true;
break;
case CheckJobResponse.DONE:
case CheckJobResponse.ERROR:
shouldSkip = false;
break;
}
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);
return shouldSkip;
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
return observable.delay(1, TimeUnit.SECONDS);
}
}).subscribe(new Subscriber<CheckJobResponse>(){
@Override
public void onNext(CheckJobResponse response) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);
}
@Override
public void onError(BaseError error) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();
}
@Override
public void onCompleted() {
if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
}
});
「repeatWhen」と「skipWhile」を使用します。サーバーが応答した場合
そのジョブを処理しているIを"skipWhile"チェーンから "true"を返すと、Observableは元のObservableが1秒間待機し、httpリクエストを再度実行します。 このプロセスは、 "skipWhile"チェーンから "false"を返すまで繰り返されます。私は私が戻るまで、それは本来の観測から何かを(のonError、onNext、onCompleteの)放出しないことを「skipWhile」のドキュメントで見た
:ここ
は、私は理解していないいくつかのことですその "呼び出し"メソッドから "false"を返します。だから、もし何も出さなければ、 "repeatWhen" Observableがそれをやっているのはなぜですか?それは1秒間待ってから再度要求を実行します。誰がそれを開始するのですか?2番目の質問は、「repeatWhen」からの観測が永遠に実行されない理由、「skipWhile」から「false」を返したときに繰り返されない理由です。私は "false"を返した場合、私の加入者にonNextを正常に取得します。
"repeatWhile"のドキュメントでは、最終的に私のサブスクライバで "onComplete"が呼び出されますが、 "onComplete"は呼び出されないと言われています。
"skipWhile"と "repeatWhen"の連鎖の順序を変更しても違いはありません。何故ですか ?
私はRxJavaがopensourceであり、コードを読むことができますが、私が言ったように、理解することは本当に難しいと思います。
ありがとうございました。
私はtypoを作成しました。 "call"メソッドから "false"を返すまで、元のObservableから何も(onError、onNext、onComplete)を放出しないことをskipWhileのドキュメントで見ました。 元のドキュメントです:指定した *条件が満たされている限り、Observableソースによって放出されたすべてのアイテムをスキップするObservableを返しますが、条件がfalseになるとすぐにすべての追加ソースアイテムを放出します。 最初に数回「処理」を行い、メソッドから「真」を返します。それは何も放出すべきではありません。それではなぜ再試行が起こっているのですか? –
#2について。いいえ、私はアクティビティのonStopでサブスクリプションの登録を解除するコードしかありませんが、そうではありません。私はまだ私はAPI呼び出しでサーバーをポーリングしているのを見ているでしょう:) –
最初のコメントについて - checkJob()の放出はおそらく 'onNext(CheckJobResponse)' - > 'onCompleted()'です。 'PROCESSING'を返すときに最初のものを転送するのをスキップしますが、' onCompleted() 'はまだ通過します。 (私はソースコードを見てこれを確認しました) –