2017-07-10 17 views
1

私はちょうど何かを欠いていると確信しています。私は、次のコードを実行しているよ:Java Reactor onComplete inconsistency

@Test 
public void simpleCreation() throws Exception { 
    Iterator<String> data = ImmutableList.of("1", "2", "3").iterator(); 
    Flux<String> stringFlux = Flux.create(emmiter -> { 
     while (data.hasNext()) { 
      emmiter.next(data.next()); 
     } 
     emmiter.complete(); 
    }); 
    ConnectableFlux<String> connectableFlux = stringFlux.publish(); 

    connectableFlux.doOnComplete(() -> System.out.println("connectableFlux.doOnComplete")); 
    stringFlux.doOnComplete(() -> System.out.println("stringFlux.doOnComplete")); 

    CountDownLatch completeLatch = new CountDownLatch(1); 
    Disposable disposable = connectableFlux.subscribe(s -> { 
     System.out.println("subscribe: data: " + s); 
    }, error -> { }, completeLatch::countDown); 

    connectableFlux.connect(); 

    completeLatch.await(); 
    disposable.dispose(); 
} 

と、それは「connectableFlux.doOnComplete」または「stringFlux.doOnComplete」のいずれか、または両方を印刷することを期待するが、私はどちらを参照してください。 subscribeメソッドからのOnCompleteコールバックは問題なく実行されますが、これらのメソッドのどちらも呼び出されず、なぜそれほどよく見えません。

私にとっては、やや矛盾しているようです。ある場所ではコールバックが呼び出され、他は無視されます。私はdoOnNextで同様の動作を観察することができます。

誰かがその背後にある概念を説明できるかどうかは分かります。私はそれがバグではないと確信していますが、フレームワークや一般的な概念について私が見逃しているものです。

答えて

3

この行は、問題を引き起こしている:

connectableFlux.doOnComplete(() -> System.out.println("connectableFlux.doOnComplete")); 

doOnComplete()への呼び出しの結果は無視されます。このメソッドは、subscribe()を呼び出すFluxインスタンスの新しいバージョンを返します。古いconnectableFluxインスタンスにロジックを追加しません。

はこのようにそれを試してみてください。

Iterator<String> data = ImmutableList.of("1", "2", "3").iterator(); 
Flux<String> stringFlux = Flux.create(emmiter -> { 
    while (data.hasNext()) { 
     emmiter.next(data.next()); 
    } 
    emmiter.complete(); 
}); 

stringFlux.doOnComplete(() -> System.out.println("stringFlux.doOnComplete()")) 
     .subscribe(s -> System.out.println("subscribe: data: " + s), error -> {}) 
     .dispose(); 

stringFlux.publish().connect(); 
関連する問題