しますRXJava - 私は次の操作を行い観測を作成する(例えば、バッファ、ウィンドウで)pausable観測可能
- 彼らは、彼らがしている間
- はすぐに、アイテムを発する一時停止している間、すべての項目をバッファリング
- 一時停止/再開トリガーは別の観測から来る必要があります
- メインスレッドで実行されないオブザーバブルで保存する必要があります。保存する必要があります。メインスレッドから一時停止/再開状態を変更する必要があります。
トリガーとしてBehaviorSubject<Boolean>
を使用し、このトリガーをアクティビティーのonResume
およびイベントにバインドします。 (コード例は、追加)
質問
私はセットアップに何かをしましたが、意図したとおり、それが機能していません。私は次のようにそれを使用します。
Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
を現在の問題は、バリアント1が正常に動作しなければならないこと、であるが、時には、イベントは単に放出されない - バルブのすべてが動作するまでの弁は、発光されていない(かもしれスレッド問題...)!ソリューション2ははるかに単純で機能しているようですが、本当に良いかどうかは分かりませんが、そうは思わないです。私は実際には、なぜ解決策1が時々失敗しているのですか?解決策2が(現在私のために知られていない)問題を解決するかどうかわかりません...
誰かが問題か、シンプルなソリューションは確実に動作するはずですか?または、私に信頼できるソリューションを教えてください。
コード
RxValue
https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08
RXPauser機能
public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}
private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while emission is paused
Observable<T> sharedSource = source.publish().refCount();
Observable<T> queue = sharedSource
.buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
.flatMap(l -> Observable.from(l))
.doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));
// this observable emits all items that are emitted while emission is not paused
Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
.switchMap(tObservable -> tObservable)
.doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));
// combine both observables
return queue.mergeWith(window)
.doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}
活性
public class BaseActivity extends AppCompatActivity {
private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);
public BaseActivity(Bundle savedInstanceState)
{
super(args);
final Class<?> clazz = this.getClass();
pauser
.doOnUnsubscribe(() -> {
L.d(clazz, "Pauser unsubscribed!");
})
.subscribe(aBoolean -> {
L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
});
}
public PublishSubject<Boolean> getPauser()
{
return pauser;
}
@Override
protected void onResume()
{
super.onResume();
pauser.onNext(true);
}
@Override
protected void onPause()
{
pauser.onNext(false);
super.onPause();
}
}
人々は疑問に非常に明確にしているという重要な要件で不足している、これまでのところ、あるのに役立ちます: _を」一時停止/再開トリガは別の観測可能な "_から来なければなりません。 彼らは固定された時間スケジュールを望まない。 –