2016-12-22 21 views
0

RxJava BlockingObservableを作成しようとしていますが、(条件== true)またはタイムアウトが発生するまで、Xミリ秒ごとに変数の値を出力します。継続的に放射するRxJava Observableの作成方法は?

以下のコードは私が望むものに近いようですが、常にONCEを出して終了します。奇妙なのは、私がtakeUntil()の状態になっていることです。これは決して真実ではありません。この観測可能なものが継続的に放出され、最終的にはタイムアウトすると期待していますが、そうではありません。

私はここで間違っている/何をしていますか?

Observable.fromCallable(() -> getSendWindow()) 
     .sample(10, TimeUnit.MILLISECONDS) 
     .timeout(30, TimeUnit.SECONDS) 
     .takeUntil(sendWindow -> 1==2) 
     .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up.")) 
     .doOnCompleted(() -> { 
      log.info("Send window cleared"); 
     }) 
     .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow()); 
+0

.sampleは、あなたはそれがないと思う何をしません。サンプルレートは上記のObservableを10秒に1回(最大で)制限します。 – Aron

+0

文書によると:「サンプルオペレータは、Observableを定期的に見て、前回のサンプリング以来最も最近放出したアイテムを放出します。 http://www.intro.co.jp/Content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample これは、指定された間隔で「サンプリング」し、その時に見えるものを放出することを意味するものではありません? そうでない場合、正しいアプローチは何ですか? – bitstream

+1

それは正しいです。ただし、サンプリング・ウインドウの間は、項目は出力されません。したがって、「最近発行された」項目はありません。 – Aron

答えて

1

。サンプルは、あなたが思う通りのことをしません。サンプルレートは上記のObservableを10秒に1回(最大で)制限します。

Observable.fromCallable()は、イベントを1回だけ発行して完了します。

.sample()は10秒待機し、最後のイベント(存在する場合)を10秒ごとに送信します。したがって、イベントを1つしか持たないObservableにアタッチすると、1つのイベントが発生します。それから完了です。

あなたはおそらく(私は.netプログラマですので、私のケーシングなどを言い訳します)これはです。

編集:Javaが繰り返しイベントの間隔を使用することを伝えてくれた@akanokdに感謝します。

Observable.interval(10, timeUnit.MILLISECONDS) 
    .map(x -> getSendWindow()) 
    .takeUntil(sendWindow -> 1==2) 
    .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up.")) 
    .doOnCompleted(() -> { 
      log.info("Send window cleared"); 
     }) 
    .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow()); 

JAVA特定のバージョンへのAPI呼び出しでこの答えを編集すること自由に感じ...

+0

私はJavaの変更を加えました。主にselect()メソッドの変更RxJava2にはselect()が存在しないため、fromCallable()にはtimeout()を再追加しましたが、コードは引き続き単一の値を出してから終了します。 – bitstream

+0

@bitstream私はJavaを使ってからずっとずっと前から、 "select"の正しい名前はjavaの "map"だと思っています。 – Aron

+1

RxJavaの 'timer()'はワンショットのみで、 'interval()'が必要です。 – akarnokd

関連する問題