2017-09-28 9 views
0

カスタムPublisherとリアクションストリーム(https://github.com/reactor/reactor-core)をpublishOn関数と組み合わせて使用​​しているときは、常にNPEを取得します。私のコードで何が間違っていますか? Publisherを間違った方法で使用していますか?ReactiveStreamsカスタムパブリッシャーでpublishOnを使用する場合のNPE

Flux.from(MyPublisher()) 
      .publishOn(Schedulers.single()) 
      .subscribe { println("<-- $it received") } 

class MyPublisher : Publisher<Int> { 
    override fun subscribe(sub: Subscriber<in Int>) { 
     while (true) { 
      Thread.sleep(300) 
      sub.onNext(1) 
     } 
    } 
} 

例外がある:

Exception in thread "main" java.lang.NullPointerException 
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212) 
    at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18) 
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) 
    at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6447) 
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6440) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6404) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6347) 
    at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11) 
+0

downvoteだけではありませんが、この質問が悪いと思われる理由をお伝えください。 – guenhter

答えて

2

Publisher "は、反応性ストリーム" 規格によって定義され、要求の数を有しています。これらの要件の1つは、Subscriber.onSubscribe HASがプロトコルに従うために他の方法のいずれかの前に呼び出されることです。

あなたはこれをしていないので、おそらく適切に初期化されていないことを意味し、原子炉クラスの内部にNPEがあります。

しかし、この問題を修正したとしても、標準はという反応性を持つように設計されています。つまり、サブスクライバが要求したときにのみデータを送信します。あなたはそれがおそらく後でラインの下で例外を引き起こすかどうかにかかわらず、それをデータ送信します。 Flux.createを使用して、独自のPublisher実装を作成するのではなく、要求を適切に処理できるエミッタを作成します。

関連する問題