2017-06-19 42 views
1

Flux FluxのReactorでバッファリング処理を実装しようとしています。内部フラックスの各エミッションは、いくつかの属性によってグループ分けされ、バッファ期間が終了した後に放出されます。GroupedFluxのFluxをバッファリングするにはどうすればよいですか?

(問題を露出させるために簡略化)以下の試験は、所望の挙動を示す:

private static final long STEP_MILLIS = 50; 

private static final Duration BUFFER_DURATION = Duration.ofMillis(STEP_MILLIS * 4); 

@Test 
public void testBuffer() throws Exception { 
    List<List<String>> buffers = new ArrayList<>(); 

    UnicastProcessor<String> queue = UnicastProcessor.create(); 
    FluxSink<String> sink = queue.sink(); 

    Flux<Flux<String>> fluxOfFlux = queue.map(Flux::just); 

    // Buffering 
    fluxOfFlux.flatMap(Function.identity()) 
     .transform(this::buffer) 
     .subscribe(buffers::add); 

    sink.next("TEST 1"); 

    Thread.sleep(BUFFER_DURATION.toMillis() - STEP_MILLIS); // One "step" before Buffer should close 

    assertTrue(buffers.isEmpty()); 

    sink.next("TEST 2"); 

    assertTrue(buffers.isEmpty()); 

    Thread.sleep(STEP_MILLIS * 2); // One "step" after Buffer should close 

    assertEquals(1, buffers.size()); 
    assertEquals(Arrays.asList("TEST 1", "TEST 2"), buffers.get(0)); 
} 

public Flux<List<String>> buffer(Flux<String> publisher) { 
    return publisher 
     .groupBy(Object::getClass) 
     .map(groupFlux -> groupFlux.take(BUFFER_DURATION).collectList()) 
     .flatMap(Function.identity()); 
} 

このテストでは、書かれたように動作します。

テストは、キューに入れ、すべての項目として失敗し

がすぐにシングルトンとして放出される

// Buffering 
    fluxOfFlux.flatMap(flux -> flux.transform(this::buffer)) 
     .subscribe(buffers::add); 
「バッファ」:私は「flatMap」を組み合わせて、一つの操作にして操作を「変革」しようとすると問題が発生します。

これは、バッファリングする前にflatMapを使用すると、内部のFluxesの処理を並列化する機能が削除されるため、望ましくありません。

これはReactorで書かれていますが、Observableと同様の名前のメソッドを使用してRxJavaと1対1で対応する必要があります。

誰かが私が間違っているかもしれない考えを持っていますか?

答えて

1

私は組み合わされたアプローチの問題を理解しました。jistは、書かれたサンプルデータでは達成したいことができないということです。

凝縮されたアプローチを使用すると、実際にバッファリングされるのは、それぞれ内部のFluxです。これらのFluxのそれぞれは、シングルトンを放出した後で完了するため、これにより、囲みバッファが完了して放出されます。これは、私のアプリケーションで何がうまくいかないのかを知ることにつながりました。

私は内側のフラックスの中に、その後グループ私が最初にconcatMapにさらに上流の内側フラックスを生成してどのように変更することにより、自分のアプリケーションの設計では、この欠陥に対処しなければなりませんでした。

0

私はまだ原子炉から始まっていますが、あなたのコードは副作用がないとは思われません。 Keep状態(リスト<>バッファを使用している場合)は、将来不快なバグにつながる可能性があります。

あなたが怒鳴る

List<String> tests = Arrays.asList("TEST 1", "TEST 2", "TEST 3", "INTEGRATION 1", "INTEGRATION 2") 

@Test 
public void test() { 
    Flux<List<String>> bufferedFlux = Flux.fromIterable(tests).buffer(Duration.ofSeconds(5)); 

    StepVerifier.withVirtualTime(() -> bufferedFlux) 
      .expectNext(tests) 
      .verifyComplete(); 
} 

@Test 
public void group() { 
    Flux<GroupedFlux<String, String>> flux = Flux 
      .fromIterable(tests) 
      .groupBy(test -> test.replaceAll("[^a-zA-Z]", "")); 

    StepVerifier.create(flux) 
      .expectNextMatches(groupedFlux -> groupedFlux.key().equals("TEST")) 
      .expectNextMatches(groupedFlux -> groupedFlux.key().equals("INTEGRATION")) 
      .verifyComplete(); 


    Flux<String> sum = flux 
      .flatMap(groupedFlux -> groupedFlux 
        .map(test -> test.replaceAll("[^\\d.]", "")) 
        .map(Integer::valueOf) 
        .reduce((t1, t2) -> t1 + t2) 
        .map(total -> groupedFlux.key() + " TOTAL: " + total) 
      ).sort(); 

    StepVerifier.create(sum) 
      .expectNext("INTEGRATION TOTAL: 3") 
      .expectNext("TEST TOTAL: 6") 
      .verifyComplete(); 
} 
見ることができるようにあなただけのimmutablesを使用して同じことを達成することができます
関連する問題