1
A
答えて
0
私はbuffer(Duration)
があなたのニーズに合うだろうと思ったが、それはしていません。
編集:まったく同じ必要性を持つ人が、そのオペレータを使用するように誘惑されている場合は、このままにしてください。バッファのこの変形は、連続する時間ウィンドウ(それぞれがbuffer
を生成する)にシーケンスを分割する。つまり、新しいdelay
は、前のものの最後から開始します。新しい遅延のない要素が放出されるとは限りません。
1
これは、単純ではない組の演算子で実現できます。
import java.time.Duration;
import java.util.*;
import reactor.core.publisher.*;
public class DelayedBuffer {
public static void main(String[] args) {
Flux.just(1, 2, 3, 6, 7, 10)
.flatMap(v -> Mono.delayMillis(v * 1000)
.doOnNext(w -> System.out.println("T=" + v))
.map(w -> v)
)
.compose(f -> delayedBufferAfterFirst(f, Duration.ofSeconds(2)))
.doOnNext(System.out::println)
.blockLast();
}
public static <T> Flux<List<T>> delayedBufferAfterFirst(Flux<T> source, Duration d) {
return source
.publish(f -> {
return f.take(1).collectList()
.concatWith(f.buffer(d).take(1))
.repeatWhen(r -> r.takeUntilOther(f.ignoreElements()));
});
}
}
(予想される放射パターンが良好期限が関与しているにカスタムオペレータと一致させることができることは留意されたい。)
+0
ありがとう、それは私を助けましたが、これは私の必要性に合っています: .publish(f - > f.take(1).collectList() \t .concatWith(f.take(D).collectList() \t \t .filter(リスト! - > list.isEmpty()) \t \t .repeatWhen(R - > r.takeWhile(N - > N> 0)) ) \t .repeatWhen(r - > r.takeUntilOther(f.ignoreElements()))) –
関連する問題
- 1. リアクタ・ルータ3ブレッドクラム・ドロップダウン
- 2. リアクタ内のフラックスをシリアライズ
- 3. リアクタ・ルータ3レンダリングされません
- 4. リアクタ・ルータ - アヤックス・ディープリンクアップ
- 5. isomorphic react.js without flux
- 6. Clojure Storm Flux
- 7. リアクタ対プロクタ
- 8. リアクタのマッチミス
- 9. リアクタ・ルータonChangeフック
- 10. リアクタ・ルータOnChangeリダイレクト
- 11. リアクタ電卓
- 12. リアクタ・レンダリング・イベント
- 13. リアクタ認可
- 14. リアクタのショーデータ
- 15. リアクタ・ルータ・パラメータ
- 16. リアクタ認証リダイレクト
- 17. Flux Utils MapStore
- 18. ワンアクション複数ストアリスナー - Flux
- 19. リアクタのルータ設定
- 20. リアクタ - ルータ - Reduxとリアクトブートストラップ
- 21. リアクタ2v対リアクタ3v - 違いは何ですか?
- 22. リアクタがリアクタでレンダリングされていない
- 23. NavigationExperimentalとReact-native-Router-Flux
- 24. Typo3 7.6 Fluxバックエンドレイアウトの列タイトル
- 25. Redux/Flux(ReactJS付き)とアニメーション
- 26. Spring Web-Flux:要求に応じてFluxをWebクライアントに戻す方法は?
- 27. Apache Storm Fluxの変更トポロジ
- 28. Fluidtypo3 Flux - テーブルフィールドに保存
- 29. リアクタ・ルータ - デフォルト・ルートなし
- 30. ReactJS + Flux - サブコンポーネントデータ用に保存
なし、緩衝液で()遅延は、別の遅延が終了するたびに開始されます。現在の遅延がない場合は、各受信値にその値を設定し、値を返して遅延を開始するか、遅延が既に発生している場合はバッファリングする必要があります。 –
答えが残っていて、なぜそれが適合していないのかを明確にするために編集しました –