2017-01-26 5 views

答えて

0

私はbuffer(Duration)があなたのニーズに合うだろうと思ったが、それはしていません。

編集:まったく同じ必要性を持つ人が、そのオペレータを使用するように誘惑されている場合は、このままにしてください。バッファのこの変形は、連続する時間ウィンドウ(それぞれがbufferを生成する)にシーケンスを分割する。つまり、新しいdelayは、前のものの最後から開始します。新しい遅延のない要素が放出されるとは限りません。

+0

なし、緩衝液で()遅延は、別の遅延が終了するたびに開始されます。現在の遅延がない場合は、各受信値にその値を設定し、値を返して遅延を開始するか、遅延が既に発生している場合はバッファリングする必要があります。 –

+0

答えが残っていて、なぜそれが適合していないのかを明確にするために編集しました –

1

これは、単純ではない組の演算子で実現できます。

import java.time.Duration; 
import java.util.*; 

import reactor.core.publisher.*; 

public class DelayedBuffer { 

    public static void main(String[] args) { 
     Flux.just(1, 2, 3, 6, 7, 10) 
     .flatMap(v -> Mono.delayMillis(v * 1000) 
       .doOnNext(w -> System.out.println("T=" + v)) 
       .map(w -> v) 
     ) 
     .compose(f -> delayedBufferAfterFirst(f, Duration.ofSeconds(2))) 
     .doOnNext(System.out::println) 
     .blockLast(); 
    } 

    public static <T> Flux<List<T>> delayedBufferAfterFirst(Flux<T> source, Duration d) { 
     return source 
     .publish(f -> { 
      return f.take(1).collectList() 
      .concatWith(f.buffer(d).take(1)) 
      .repeatWhen(r -> r.takeUntilOther(f.ignoreElements())); 
     }); 
    } 
} 

(予想される放射パターンが良好期限が関与しているにカスタムオペレータと一致させることができることは留意されたい。)

+0

ありがとう、それは私を助けましたが、これは私の必要性に合っています: .publish(f - > f.take(1).collectList() \t .concatWith(f.take(D).collectList() \t \t .filter(リスト! - > list.isEmpty()) \t \t .repeatWhen(R - > r.takeWhile(N - > N> 0)) ) \t .repeatWhen(r - > r.takeUntilOther(f.ignoreElements()))) –

関連する問題