2017-06-19 4 views
0

私はPublishSubjectを使用する次のコードを持っています。RxJava2 PublishSubjectでの予期しない動作

val subject = PublishSubject.create<Int>() 

val o1: Observable<String> = 
     subject.observeOn(Schedulers.newThread()).map { i: Int -> 
      println("${Thread.currentThread()} | ${Date()} | map => $i") 
      i.toString() 
     } 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it") 
} 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it") 
} 

o1.subscribe { 
    println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it") 
} 

println("${Thread.currentThread()} | ${Date()} | submitting 1") 

subject.onNext(1) 

1)私は、この例の目的のために、私はちょうどString)=>o1に変換しています(それからObservableを作成し、それをマップします。

2)私はo1に3回申し込む。

3)最後に、subject.onNext(1)を呼び出してイベントを「公開」します。

Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1 
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1 
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1 
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1 
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1 

mapが3回呼び出されてしまうと、私はmapが発生した後に起こるべきo1に加入しておりますので、なぜ私は理解していない:私の驚きに

は、私は次の出力を取得しています。私は何かを欠いているに違いない。どんな助けもありがとう。

おかげ ヤン

+0

あなたは 'on1'を3回購読し、' onNext'をすべての3つのチェーンにディスパッチする 'PublishSubject'まで独立したシーケンスを作成します。 – akarnokd

+0

あなたは「PublishSubject'まで」と言っています:なぜそれは主題に至るまでですか?これがどこで説明されるのか教えていただけますか?これが正常な動作であれば、地図の後にストリームを変換して、それができないようにする方法がありますか? – yan

+0

PublishSubjectは、3人のすべてのサブスクライバの観点から、subscribe()コールによって確立された独立したチェーンを通じてイベントを通知するマルチキャストソースです。 – akarnokd

答えて

1

のコメント:

あなたはそれぞれがすべての3つのチェーンへonNextを派遣しますPublishSubjectまでの独立したシーケンスを作成し、o1に3回をサブスクライブします。

PublishSubjectは、すべての3人の加入者の観点から、subscribe()コールによって確立された独立したチェーンを通じてイベントを通知するマルチキャストソースです。これらの作業要素は彼らだけが購読している唯一の情報源Subjectに執着するのでSubject

適用する事業者は、一般的にチェーン全体が熱くなりません。したがって、複数のサブスクリプションは同じアップストリームに複数のチャネルを生成しますSubject

を使用して、ConnectableObservable(まったく別のPublishSubject)を取得して、その時点から熱くなるシーケンスを作成します。

+0

私は、 'map'の後に' .publish.autoConnect() 'を追加することで正しいことを確認します。私は他の 'PublishSubject'_が何を意味しているかを100%確信していませんが、私が作ったのは' .subscribe {otherSubject.onNext(it)} 'を追加して、この他のサブジェクトでサブスクリプションを起こさせることでした。 – yan

+0

ループを閉じるには、2つのオプションhttps://gist.github.com/ypujante/2ab3c3a135272ea4bc4554cbfc287ca7を説明する要点を作成しました。最終的には、2番目のテーマでオプションを選択しました。 – yan