2017-01-16 7 views
3

私はイベントをトリガーするObservableを取るAPIを持っています。RxJavaの指数バックオフ

インターネット接続が検出された場合はdefaultDelay秒ごとに値を送信し、接続がない場合はnumberOfFailedAttempts^2回遅延させて返します。

私は私がいる最大の問題はretryWhen's観測可能で、様々なスタイルの束を試してみたが一度だけ評価されています

Observable 
    .interval(defaultDelay,TimeUnit.MILLISECONDS) 
    .observeOn(Schedulers.io()) 
    .repeatWhen((observable) -> 
     observable.concatMap(repeatObservable -> { 
      if(internetConnectionDetector.isInternetConnected()){ 
       consecutiveRetries = 0; 
       return observable; 
      } else { 
       consecutiveRetries++; 
       int backoffDelay = (int)Math.pow(consecutiveRetries,2); 
       return observable.delay(backoffDelay, TimeUnit.SECONDS); 
       } 
     }).onBackpressureDrop()) 
    .onBackpressureDrop(); 

は私がしようとしてる何をする方法はありますか?私は関連する質問を見つけましたが(今検索することはできません)、取られたアプローチは動的な価値を伴わないようです。

答えて

3

はあなたのコードでは、2つのミスがあります。

  1. は、いくつかの観察可能なシーケンスを繰り返すためには、そのシーケンスは有限である必要があります。私。 intervalの代わりに、just、またはfromCallableのようなものを以下のサンプルで使用した方がよいでしょう。
  2. repeatWhenの内部関数から、新しい遅延観測可能なソースを返す必要があるので、observable.delay()の代わりにObservable.timer()を返す必要があります。

の作業コード:

public void testRepeat() throws InterruptedException { 
    logger.info("test start"); 

    int DEFAULT_DELAY = 100; // ms 
    int ADDITIONAL_DELAY = 100; // ms 
    AtomicInteger generator = new AtomicInteger(0); 
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive 

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet) 
      .repeatWhen(counts -> { 
       AtomicInteger retryCounter = new AtomicInteger(0); 
       return counts.flatMap(c -> { 
        int retry = 0; 
        if (connectionAlive.get()) { 
         retryCounter.set(0); // reset counter 
        } else { 
         retry = retryCounter.incrementAndGet(); 
        } 
        int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2); 
        logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay); 
        return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS); 
       }); 
      }) 
      .subscribe(v -> logger.info("got {}", v)); 

    Thread.sleep(220); 
    logger.info("connection dropped"); 
    connectionAlive.set(false); 
    Thread.sleep(2000); 
    logger.info("connection is back alive"); 
    connectionAlive.set(true); 
    Thread.sleep(2000); 
    subscription.dispose(); 
    logger.info("test complete"); 
} 

repeatWhenhereに関する詳細な記事を参照してください。

+0

問題のサンプルはおそらく、私が使った2つのアプローチ(1つのタイマー+再試行ベース、1つのインターバル+遅延サブスクリプションベース)を混在させるようだから、試行錯誤の途中だったでしょう。リトライ/リピート可能な入力を再度使用する必要があります。その観察可能な原因を使用しても、サブスクリプションの漏れが問題にならないのですか? –

+1

@AssortedTrailmixこれは第1レベルの入力についてのものであり、内部の「flatMap」に関するものではありません。この記事の最後の例は、非常によく似たパターンです。 –

+0

オハイオ州参照してください、私は逃した申し訳ありませんが 'カウントは'フラットになっていたものだった –

1

retryWhen演算子を使用すると、接続がないときに遅延を設定できます。定期的にアイテムを放出する方法は別個のトピックです(intervalまたはtimerオペレータを参照してください)。あなたがそれを理解できなければ、別の質問を開きます。

私はGithubの広範な例を持っていますが、私はあなたにここの要点を教えてくれます。

RetryWithDelay retryWithDelay = RetryWithDelay.builder() 
    .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT) 
    .build() 

Single.fromCallable(() -> { 
    ... 
}).retryWhen(retryWithDelay) 
.subscribe(j -> { 
    ... 
}) 

RetryWithDelayは、以下のように定義される。私はRxJava 2.xを使っていたので、もしあなたが1.xを使っているなら、署名はFunc1<Observable<? extends Throwable>, Observable<Object>>でなければなりません。

public class RetryWithDelay implements 
     Function<Flowable<? extends Throwable>, Publisher<Object>> { 
    ... 
} 

RetryWithDelayクラス。

RetryStrategy enum。

これにより、RetryDelayStrategyに基づいて、さまざまな種類のタイムアウト(定数、線形、指数関数)を設定できます。ユースケースの場合は、CONSTANT_DELAY_TIMES_RETRY_COUNT遅延戦略を選択し、RetryWithDelayをビルドするときはretryDelaySeconds(2)に電話してください。

retryWhenは複雑な、おそらくバグの多い演算子です。ほとんどの例では、range演算子をオンラインで使用していますが、再試行がないと失敗します。詳細は私の答えhereを参照してください。

2

指数バックオフのために私はいつもretryWhenが多少低レベルであることを発見しました。私はユニットテスト済みでRxJava 1.xのrxjava-extrasで利用可能なビルダー(Abhijitなど)を使用します。私は遅延の指数関数的な増加があなたが定義する最大値を超えないように、キャップ付きバージョンを使用することをお勧めします。

これは、あなたがそれを使用する方法です:

observable.retryWhen(
    RetryWhen.exponentialBackoff(
     delay, maxDelay, TimeUNIT.SECONDS) 
    .build()); 

あなたはそれRxJavaにバグレポートを見つけた場合、私はretryWhenにバグがあることを同意しないけど。バグは速く修正されています!

あなたはMavenの中央にある rxjava-エキストラ 0.8.0.6以降が必要です:

<dependency> 
    <groupId>com.github.davidmoten</groupId> 
    <artifactId>rxjava-extras</artifactId> 
    <version>0.8.0.6</version> 
</dependency> 

あなたはRxJava 2.xのバージョンが必要な場合は、私に教えてください。同じ機能が0.130からrxjava2-extrasで利用可能です。

+0

私はこれをどこかで見たことが分かっていました!私はここに車輪を再現したくないので、おそらく私はこれと一緒に行って、どうやったらそれをどうしたらいいのか見てみようと思っている。 –

+0

今日、私は最大のバックオフを実装するのを忘れていたが、そのメソッドのような署名は私のために存在しませんバージョン0.8.0.6を使用して –

+0

私は急いでいたので、私は私がそれを必要とするコードの部分に戻ってくるだろうと自分自身に言った、私は期待した振る舞いを持っています。それは、呼び出しが成功した後に再試行しなければならないことです(これは "帯域外"通信を必要とするので意味があります)。私はrepeatWhenのアプローチが以下のように私の現在の状況で必要なものだと思っています。このアプローチは、 "動作するまで再試行する"と "再試行する"と "動作しないと長くする"のより一般的な状況に最適化されています。 –

関連する問題