私は2つのストリームを持っています。最初のストリームはデータベースからデータを取り出すストリームで、データを取り終えたらonCompleted()
を呼び出します。 2番目のストリームはサーバーからライブデータを受け取り、onCompleted()
を決して呼び出さないストリームです。私がしたいのは、最初のストリーム(上流)が空のストリームの場合にアクションを実行できる演算子を作成することです。ここではサンプルです:ここではRxJavaでdoIfEmpty演算子を作成
getItemFromDatabase()
.lift(new DoIfEmptyOperator<Item>(new Action0() {
@Override
public void call() {
//Database is empty
System.out.println("Yeay successfully do an action");
}
}))
.concatWith(getItemFromServer()) // -----> intentionally never complete
.subscribe(new Subscriber<StoryItem>() {
@Override
public void onCompleted() {
//dosomething...
}
@Override
public void onError(Throwable e) {
//dosomething...
}
@Override
public void onNext(StoryItem storyItem) {
//dosomething
}
}));
はDoIfEmptyOperatorのコードは次のとおりです。
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
public class DoIfEmptyOperator<T> implements Observable.Operator<T,T>{
private Action0 action;
private boolean isEmpty = true;
public DoIfEmptyOperator(Action0 action) {
this.action = action;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> childSubscriber) {
Subscriber<T> parentSubscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
if(isEmpty) {
action.call();
}
childSubscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
childSubscriber.onError(e);
}
@Override
public void onNext(T t) {
isEmpty = false;
childSubscriber.onNext(t);
}
};
childSubscriber.add(parentSubscriber);
return parentSubscriber;
}
}
下流が完了したことがないので、parentSubscriber onCompleted()
は、発射されていないため、アクションが実行されることはありませんが。私は
.concatWith(getItemFromServer())
を削除すると、アクションが実行されます。どのように問題を解決するかについての手掛かり?私はObservable.switchIfEmpty()のソースコードを参照していますが、それがどのように動作するかについての手掛かりはまだありません。
Ahhはそれについて考えたことはありません。 – SalacceoVanz