sampleTimeout
を使用すると、送信元の各受信値x
にFlux
X'
を関連付けることができます。 X'
がソースで次の値が出力される前に完了すると、値x
が出力されます。そうでない場合は、x
が削除されます。 同じ処理が後続の値に適用されます。
オリジナルのシーケンスを、各コンパニオンフラックスの開始と終了によって区切られたウィンドウに分割すると考えてください。 2つのウィンドウが重なっている場合、最初のウィンドウをトリガーした値はドロップされます。
sample(Duration)
には、単一のコンパニオンフラックスのみを扱っています。シーケンスを連続したウィンドウに分割し、特定のウィンドウ中に最後に放出された要素以外のすべての要素を削除します。
(編集):あなたのユースケースについて
私が正しく理解していれば、あなたはあなたが定期的にスケジュールする種々の長さの処理を持っているように見えるが、あなたはまた、値を検討する必要はありません処理には複数の処理が必要です期間?
publishOn
を使用して独自のスレッドで処理を分離し、2)要件の2番目の部分にはsample(Duration)
が必要です(タスクに割り当てられた遅延は変更されません)。このような
何か:
List<Long> passed =
//regular scheduling:
Flux.interval(Duration.ofMillis(200))
//this is only to show that processing is indeed started regularly
.elapsed()
//this is to isolate the blocking processing
.publishOn(Schedulers.elastic())
//blocking processing itself
.map(tuple -> {
long l = tuple.getT2();
int sleep = l % 2 == 0 || l % 5 == 0 ? 100 : 210;
System.out.println(tuple.getT1() + "ms later - " + tuple.getT2() + ": sleeping for " + sleep + "ms");
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
return l;
})
//this is where we say "drop if too long"
.sample(Duration.ofMillis(200))
//the rest is to make it finite and print the processed values that passed
.take(10)
.collectList()
.block();
System.out.println(passed);
出力:
205ms later - 0: sleeping for 100ms
201ms later - 1: sleeping for 210ms
200ms later - 2: sleeping for 100ms
199ms later - 3: sleeping for 210ms
201ms later - 4: sleeping for 100ms
200ms later - 5: sleeping for 100ms
201ms later - 6: sleeping for 100ms
196ms later - 7: sleeping for 210ms
204ms later - 8: sleeping for 100ms
198ms later - 9: sleeping for 210ms
201ms later - 10: sleeping for 100ms
196ms later - 11: sleeping for 210ms
200ms later - 12: sleeping for 100ms
202ms later - 13: sleeping for 210ms
202ms later - 14: sleeping for 100ms
200ms later - 15: sleeping for 100ms
[0, 2, 4, 5, 6, 8, 10, 12, 14, 15]
ので、ブロッキング処理は、約200毎にトリガされ、そして唯一の200ms以内に処理場所が保存されていることを値。
編集していただきありがとうございます。 – Gaurav
特に、私はFlux.intervalからの排出を減速させようとしており、Flux.sampleTimeout(RxJavaのdebounceに相当する?)が私に道を与えてくれることを期待しています。例えば、私はFlux.interval()。map(time_varying_function)で通知します。time_varying_functionにフラックス間隔よりも時間がかかる場合、 'flux.interval'イベントが蓄積されます。しかし、消費関数が遅い場合には、これらのイベントが累積することは望ましくありません。しかし、消費関数が速い場合には間隔を維持することを願っています。 – Gaurav
'map'関数をブロックすることは、' publishOn'を使って別のスケジューラにしたがってスレッド)。 –