2016-12-10 7 views
0

ここでは、テストコードRX-Javaの郵便番号が失敗した理由

final Flowable<Integer> f1 = Flowable.fromPublisher(s -> { 
     s.onNext(Integer.valueOf(1)); 
     s.onComplete(); 
    }); 


    final Flowable<Integer> f2 = Flowable.fromPublisher(s -> { 
     s.onNext(Integer.valueOf(2)); 
     s.onComplete(); 
    }); 

    Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2) 
      .blockingSubscribe(System.out::println); 

はそれが理由を理解していない

Exception in thread "main" java.lang.NullPointerException 
    at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:386) 

Iを取得するのですか?

そして、私はこの

final Flowable<Integer> f1 = Flowable.<Integer>fromPublisher(s -> { 
     s.onNext(Integer.valueOf(1)); 
     s.onComplete(); 
    }).onErrorResumeNext(Flowable.empty()); 


    final Flowable<Integer> f2 = Flowable.<Integer>fromPublisher(s -> { 
     s.onNext(Integer.valueOf(2)); 
     s.onComplete(); 
    }).onErrorResumeNext(Flowable.empty()); 

    Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2) 
      .blockingSubscribe(System.out::println); 

のようなコードを更新する場合に予想されるように、それは12を出力します。しかし、なぜ?それは意味をなさない。

+0

背圧が間違っていると思います。 –

答えて

1

fromPublisherを使用してPublisher<T>の契約を破棄しているという問題があります。

出版社は、Reactive Streams契約書に明記されているように、非常に具体的な行動をとる必要があります。この動作には、他の電話をかけてその加入者のバックプレッシャを尊重する前にSubscriber.onSubscribe()に電話することが含まれます。

onSubscribeを呼び出さないので、内部のqueue変数は決して初期化されず、をonNextメソッドで呼び出すとNPEが発生します。

おそらくonErrorResumeNextを使用することによって、実装はすべてが適切に呼び出され、無効な状態が「固定」されることを保証します。

  1. Flowable.fromPublisherを使用しないでください:二つの可能性がある問題を修正するに

    。これは、リアクティブストリーム宣言の他の実装から橋渡しすることを意図しており、いかなる保護手段も持たない。代わりに、Flowable.createを使用して、初期化とバックプレッシャを正しく処理します。

  2. バックプレッシャーに対応していないObservableを使用してください。ユースケースはバックプレッシャーには関係していないようです。安全に使用するには、もう一度Observable.createメソッドを使用してください。
+0

ありがとう、良い説明。 –