2016-01-22 14 views
5

RxJavaが本当に好きです。それは素晴らしいツールですが、時にはそれがどのように動作するのか非常に難しいです。 私たちはAndroidプロジェクトでRxJavaでRetrofitを使用していますが、次のユースケースがあります。"skipWhile"とRxJavaの "repeatWhen"を組み合わせてサーバーポーリングを実装する

サーバーが何らかの仕事をしているうちに、再試行の間に若干の遅延を伴ってサーバーをポーリングする必要があります。サーバーが終了したら、結果を提供する必要があります。私は成功しRxJavaでそれをやったので、ここでのコードスニペットは、次のとおりです。 私はコードが正常に動作します

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob) 
     .skipWhile(new Func1<CheckJobResponse, Boolean>() { 

      @Override 
      public Boolean call(CheckJobResponse checkJobResponse) { 
       boolean shouldSkip = false; 

       if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus()); 

       switch (checkJobResponse.getJobStatus()){ 
        case CheckJobResponse.PROCESSING: 
         shouldSkip = true; 
         break; 
        case CheckJobResponse.DONE: 
        case CheckJobResponse.ERROR: 
         shouldSkip = false; 
         break; 
       } 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip); 

       return shouldSkip; 
      } 
     }) 
     .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { 
      @Override 
      public Observable<?> call(Observable<? extends Void> observable) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable); 
       return observable.delay(1, TimeUnit.SECONDS); 
      } 
     }).subscribe(new Subscriber<CheckJobResponse>(){ 
      @Override 
      public void onNext(CheckJobResponse response) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response); 

      } 

      @Override 
      public void onError(BaseError error) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error); 
       Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show(); 

      } 

      @Override 
      public void onCompleted() { 
       if (SHOW_LOGS) Logger.v(TAG, "onCompleted"); 
      } 
     }); 

「repeatWhen」と「skipWhile」を使用します。サーバーが応答した場合

そのジョブを処理しているIを"skipWhile"チェーンから "true"を返すと、Observableは元のObservableが1秒間待機し、httpリクエストを再度実行します。 このプロセスは、 "skipWhile"チェーンから "false"を返すまで繰り返されます。私は私が戻るまで、それは本来の観測から何かを(のonError、onNext、onCompleteの)放出しないことを「skipWhile」のドキュメントで見た

  1. :ここ

    は、私は理解していないいくつかのことですその "呼び出し"メソッドから "false"を返します。だから、もし何も出さなければ、 "repeatWhen" Observableがそれをやっているのはなぜですか?それは1秒間待ってから再度要求を実行します。誰がそれを開始するのですか?

  2. 2番目の質問は、「repeatWhen」からの観測が永遠に実行されない理由、「skipWhile」から「false」を返したときに繰り返されない理由です。私は "false"を返した場合、私の加入者にonNextを正常に取得します。

  3. "repeatWhile"のドキュメントでは、最終的に私のサブスクライバで "onComplete"が呼び出されますが、 "onComplete"は呼び出されないと言われています。

  4. "skipWhile"と "repeatWhen"の連鎖の順序を変更しても違いはありません。何故ですか ?

私はRxJavaがopensourceであり、コードを読むことができますが、私が言ったように、理解することは本当に難しいと思います。

ありがとうございました。

答えて

13

以前はrepeatWhenで作業していませんでしたが、この質問は私に不思議な気持ちになりました。

skipWhileは、それがその前にtrueを返すことはありません場合でも、onErroronCompletedを発するん。したがって、checkJob()onCompletedを放出するたびにrepeatWhenが呼び出されています。それは質問#1に答えます。

残りの質問は、誤った前提に基づいています。 repeatWhenは決して終了しないため、定期購読は永久に実行されています。それはrepeatWhenがあなたよりも複雑な獣だからです。送信元からonCompletedを取得すると、Observablenullを送信します。あなたがそれを取り戻してonCompletedを返した場合は終了し、何も出力しない場合は再試行します。 delayは放散して遅延するため、常にnullを再度送信しています。そのように、それは絶えず再登録しています。

#2の答えは、それが永遠に実行されていることです。サブスクリプションをキャンセルするには、このコードの外にある何か他の処理を行っていると思われます#3の場合、決して完了しないので、onCompletedを得ることはありません。 #4では、無期限に繰り返しているため、順序は関係ありません。

問題は、正しい動作をどうやって得るのですか? skipWhileの代わりにtakeUntilを使用するのと同じくらい簡単です。そうすれば、まで繰り返すことで、望む結果が得られ、ストリームを終了したいときに終了させることができます。ここ

はサンプルコードは次のとおりです。この例で

Observable<Boolean> source = ...; // Something that eventually emits true 

source 
    .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS)) 
    .takeUntil(result -> result) 
    .filter(result -> result) 
    .subscribe(
     res -> System.out.println("onNext(" + res + ")"), 
     err -> System.out.println("onError()"), 
     () -> System.out.println("onCompleted()") 
    ); 

sourceはブール値を発しています。ソースがtrueになるまで1秒ごとに繰り返します。私はresulttrueになるまで取る。そして、falseのすべての通知をフィルタリングして、trueになるまでサブスクライバに通知しないようにします。

+0

私はtypoを作成しました。 "call"メソッドから "false"を返すまで、元のObservableから何も(onError、onNext、onComplete)を放出しないことをskipWhileのドキュメントで見ました。 元のドキュメントです:指定した *条件が満たされている限り、Observableソースによって放出されたすべてのアイテムをスキップするObservableを返しますが、条件がfalseになるとすぐにすべての追加ソースアイテムを放出します。 最初に数回「処理」を行い、メソッドから「真」を返します。それは何も放出すべきではありません。それではなぜ再試行が起こっているのですか? –

+0

#2について。いいえ、私はアクティビティのonStopでサブスクリプションの登録を解除するコードしかありませんが、そうではありません。私はまだ私はAPI呼び出しでサーバーをポーリングしているのを見ているでしょう:) –

+0

最初のコメントについて - checkJob()の放出はおそらく 'onNext(CheckJobResponse)' - > 'onCompleted()'です。 'PROCESSING'を返すときに最初のものを転送するのをスキップしますが、' onCompleted() 'はまだ通過します。 (私はソースコードを見てこれを確認しました) –

関連する問題