Flux FluxのReactorでバッファリング処理を実装しようとしています。内部フラックスの各エミッションは、いくつかの属性によってグループ分けされ、バッファ期間が終了した後に放出されます。GroupedFluxのFluxをバッファリングするにはどうすればよいですか?
(問題を露出させるために簡略化)以下の試験は、所望の挙動を示す:
private static final long STEP_MILLIS = 50;
private static final Duration BUFFER_DURATION = Duration.ofMillis(STEP_MILLIS * 4);
@Test
public void testBuffer() throws Exception {
List<List<String>> buffers = new ArrayList<>();
UnicastProcessor<String> queue = UnicastProcessor.create();
FluxSink<String> sink = queue.sink();
Flux<Flux<String>> fluxOfFlux = queue.map(Flux::just);
// Buffering
fluxOfFlux.flatMap(Function.identity())
.transform(this::buffer)
.subscribe(buffers::add);
sink.next("TEST 1");
Thread.sleep(BUFFER_DURATION.toMillis() - STEP_MILLIS); // One "step" before Buffer should close
assertTrue(buffers.isEmpty());
sink.next("TEST 2");
assertTrue(buffers.isEmpty());
Thread.sleep(STEP_MILLIS * 2); // One "step" after Buffer should close
assertEquals(1, buffers.size());
assertEquals(Arrays.asList("TEST 1", "TEST 2"), buffers.get(0));
}
public Flux<List<String>> buffer(Flux<String> publisher) {
return publisher
.groupBy(Object::getClass)
.map(groupFlux -> groupFlux.take(BUFFER_DURATION).collectList())
.flatMap(Function.identity());
}
このテストでは、書かれたように動作します。
テストは、キューに入れ、すべての項目として失敗しがすぐにシングルトンとして放出される
// Buffering
fluxOfFlux.flatMap(flux -> flux.transform(this::buffer))
.subscribe(buffers::add);
「バッファ」:私は「flatMap」を組み合わせて、一つの操作にして操作を「変革」しようとすると問題が発生します。
これは、バッファリングする前にflatMapを使用すると、内部のFluxesの処理を並列化する機能が削除されるため、望ましくありません。
これはReactorで書かれていますが、Observableと同様の名前のメソッドを使用してRxJavaと1対1で対応する必要があります。
誰かが私が間違っているかもしれない考えを持っていますか?