RxJava観測値を使用してイベントシーケンスを設定しました。基本的には、Observable.just(Events.*)
で作成されたさまざまなイベントを、observable.delay(time, timeUnit, scheduler)
機能で設定されたさまざまな遅延でマージします。次に、PublishSubject
(events
のコード)に投稿し、そのPublishSubject
に登録して、シーケンス(以下のコードでobserveEvents()
の機能)を観察します。それはうまく動作するために使用されていましたが、最近私のデバイス(OnePlus One with Android 5.0.2)で非常に奇妙な動作が見られています(エミュレータでは表示されません)。基本的にイベントが混ざり合って、遅れのあるイベントが遅延の小さいイベントの前に来るようになり、遅延の小さなイベントがキューの最後に来ることがあります。最初の3つのイベントは、特に頻繁に混在しています。場合によっては、一部のイベントがまったく観察されないことがあります。ここで何が起こっているのでしょうか?RxJava遅延オブザーバブルが予期しない遅延で起動する
コードがKotlinである:あなたがconcat
ではなくmerge()
を使用しているため
var computationScheduler = Schedulers.computation()
private val events: PublishSubject<Events> = PublishSubject.create()
private val userActionSubject: PublishSubject<Events> = PublishSubject.create()
Observable.merge(
event0(),
event1(),
event2(),
userActionOrEvent3(),
userActionOrEvent4())
.subscribe({
// Weird timings are observed here already
events.onNext(it)
}, { e ->
events.onError(e)
}))
private fun userActionOrEvent4(): Observable<Events> {
return Observable.amb(Observable.just(Events.Event4)
.delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
.take(1)
}
private fun userActionOrEvent3(): Observable<Events> {
return Observable.amb(Observable.just(Events.Event3)
.delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
.take(1)
}
private fun event2() = Observable.just(Events.Event2)
.delay(1800, TimeUnit.MILLISECONDS, computationScheduler)
private fun event1() = Observable.just(Events.Event1)
.delay(200, TimeUnit.MILLISECONDS, computationScheduler)
private fun event0() = Observable.just(Events.Event0)
.subscribeOn(computationScheduler)
open fun observeEvents(): Observable<Events> = events.asObservable().observeOn(AndroidSchedulers.mainThread())
open fun onUserAction() {
userActionSubject.onNext(Events.Action)
}