RxJava BlockingObservable
を作成しようとしていますが、(条件== true)またはタイムアウトが発生するまで、Xミリ秒ごとに変数の値を出力します。継続的に放射するRxJava Observableの作成方法は?
以下のコードは私が望むものに近いようですが、常にONCEを出して終了します。奇妙なのは、私がtakeUntil()
の状態になっていることです。これは決して真実ではありません。この観測可能なものが継続的に放出され、最終的にはタイムアウトすると期待していますが、そうではありません。
私はここで間違っている/何をしていますか?
Observable.fromCallable(() -> getSendWindow())
.sample(10, TimeUnit.MILLISECONDS)
.timeout(30, TimeUnit.SECONDS)
.takeUntil(sendWindow -> 1==2)
.doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
.doOnCompleted(() -> {
log.info("Send window cleared");
})
.toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());
.sampleは、あなたはそれがないと思う何をしません。サンプルレートは上記のObservableを10秒に1回(最大で)制限します。 – Aron
文書によると:「サンプルオペレータは、Observableを定期的に見て、前回のサンプリング以来最も最近放出したアイテムを放出します。 http://www.intro.co.jp/Content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample これは、指定された間隔で「サンプリング」し、その時に見えるものを放出することを意味するものではありません? そうでない場合、正しいアプローチは何ですか? – bitstream
それは正しいです。ただし、サンプリング・ウインドウの間は、項目は出力されません。したがって、「最近発行された」項目はありません。 – Aron