私はPublishSubject
を使用する次のコードを持っています。RxJava2 PublishSubjectでの予期しない動作
val subject = PublishSubject.create<Int>()
val o1: Observable<String> =
subject.observeOn(Schedulers.newThread()).map { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
i.toString()
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
1)私は、この例の目的のために、私はちょうどString
)=>o1
に変換しています(それからObservable
を作成し、それをマップします。
2)私はo1
に3回申し込む。
3)最後に、subject.onNext(1)
を呼び出してイベントを「公開」します。
Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1
map
が3回呼び出されてしまうと、私はmap
が発生した後に起こるべきo1
に加入しておりますので、なぜ私は理解していない:私の驚きに
は、私は次の出力を取得しています。私は何かを欠いているに違いない。どんな助けもありがとう。
おかげ ヤン
あなたは 'on1'を3回購読し、' onNext'をすべての3つのチェーンにディスパッチする 'PublishSubject'まで独立したシーケンスを作成します。 – akarnokd
あなたは「PublishSubject'まで」と言っています:なぜそれは主題に至るまでですか?これがどこで説明されるのか教えていただけますか?これが正常な動作であれば、地図の後にストリームを変換して、それができないようにする方法がありますか? – yan
PublishSubjectは、3人のすべてのサブスクライバの観点から、subscribe()コールによって確立された独立したチェーンを通じてイベントを通知するマルチキャストソースです。 – akarnokd