2017-05-30 3 views
0

のJavaドキュメントには、次のように言う:プロジェクト・リアクターAPIのFlux :: sampleTimeoutメソッドの目的は何ですか?

は、その特定の最後の値のための出版社によって提供される時間窓の間に放出された新しい値が存在しない場合にのみ、このフラックスからの最後の値を発します。

しかし、私は上記の説明が混乱していることを発見しました。私はgitter chatでRxJavaのdebounceと似ています。誰かが例を挙げて説明してもらえますか?私は徹底的な検索を行った後、どこでもこれを見つけることができませんでした。

+0

編集していただきありがとうございます。 – Gaurav

+0

特に、私はFlux.intervalからの排出を減速させようとしており、Flux.sampleTimeout(RxJavaのdebounceに相当する?)が私に道を与えてくれることを期待しています。例えば、私はFlux.interval()。map(time_varying_function)で通知します。time_varying_functionにフラックス間隔よりも時間がかかる場合、 'flux.interval'イベントが蓄積されます。しかし、消費関数が遅い場合には、これらのイベントが累積することは望ましくありません。しかし、消費関数が速い場合には間隔を維持することを願っています。 – Gaurav

+0

'map'関数をブロックすることは、' publishOn'を使って別のスケジューラにしたがってスレッド)。 –

答えて

0

sampleTimeoutを使用すると、送信元の各受信値xFluxX'を関連付けることができます。 X'がソースで次の値が出力される前に完了すると、値xが出力されます。そうでない場合は、xが削除されます。 同じ処理が後続の値に適用されます。

オリジナルのシーケンスを、各コンパニオンフラックスの開始と終了によって区切られたウィンドウに分割すると考えてください。 2つのウィンドウが重なっている場合、最初のウィンドウをトリガーした値はドロップされます。

sample(Duration)には、単一のコンパニオンフラックスのみを扱っています。シーケンスを連続したウィンドウに分割し、特定のウィンドウ中に最後に放出された要素以外のすべての要素を削除します。

(編集):あなたのユースケースについて

私が正しく理解していれば、あなたはあなたが定期的にスケジュールする種々の長さの処理を持っているように見えるが、あなたはまた、値を検討する必要はありません処理には複数の処理が必要です期間

publishOnを使用して独自のスレッドで処理を分離し、2)要件の2番目の部分にはsample(Duration)が必要です(タスクに割り当てられた遅延は変更されません)。このような

何か:

List<Long> passed = 
     //regular scheduling: 
    Flux.interval(Duration.ofMillis(200)) 
     //this is only to show that processing is indeed started regularly 
    .elapsed() 
     //this is to isolate the blocking processing 
    .publishOn(Schedulers.elastic()) 
     //blocking processing itself 
    .map(tuple -> { 
     long l = tuple.getT2(); 
     int sleep = l % 2 == 0 || l % 5 == 0 ? 100 : 210; 
     System.out.println(tuple.getT1() + "ms later - " + tuple.getT2() + ": sleeping for " + sleep + "ms"); 
     try { 
      Thread.sleep(sleep); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     return l; 
    }) 
     //this is where we say "drop if too long" 
    .sample(Duration.ofMillis(200)) 
     //the rest is to make it finite and print the processed values that passed 
    .take(10) 
    .collectList() 
    .block(); 

System.out.println(passed); 

出力:

205ms later - 0: sleeping for 100ms 
201ms later - 1: sleeping for 210ms 
200ms later - 2: sleeping for 100ms 
199ms later - 3: sleeping for 210ms 
201ms later - 4: sleeping for 100ms 
200ms later - 5: sleeping for 100ms 
201ms later - 6: sleeping for 100ms 
196ms later - 7: sleeping for 210ms 
204ms later - 8: sleeping for 100ms 
198ms later - 9: sleeping for 210ms 
201ms later - 10: sleeping for 100ms 
196ms later - 11: sleeping for 210ms 
200ms later - 12: sleeping for 100ms 
202ms later - 13: sleeping for 210ms 
202ms later - 14: sleeping for 100ms 
200ms later - 15: sleeping for 100ms 
[0, 2, 4, 5, 6, 8, 10, 12, 14, 15] 

ので、ブロッキング処理は、約200毎にトリガされ、そして唯一の200ms以内に処理場所が保存されていることを値。

+0

ありがとうサイモン。私が本質的に望むのは、Flux.intervalが時間変動関数(計算集約型)が磁束間隔よりも長くかかる場合、次の放出を遅らせることです。だから、私は長い計算を落としたいとは思わない。間隔が2秒で、最初に計算に1.5秒かかり、2回目の計算に3秒かかってしまい、Flux.intervalの3回目の放出が2回目の計算が完了するまで遅れるということです。背圧なしにこれを行う。 – Gaurav

+0

Flux.interval(duration、Schedulers.elastic())を使用しています。なぜなら、私のユースケースでは、これらのフラックスがフラットマッピングされているからです。私はいくつかのサンプルコードで自分の投稿を編集しようとします。 – Gaurav

+0

私は上記の質問は、独自のサンプルコードとサンプルを使って別の投稿にする必要があると考えました。https://stackoverflow.com/questions/44317411/how-can-one-slow-down-emissions-form-flux-interval – Gaurav

関連する問題