ストリーム上でいくつかの操作を実行し、ストリームを2つのストリームに分割して別々に処理したいとします。RxJava:Split Rx複数のストリームにフロー可能
例問題表示する:
Flowable<SuccessfulObject> stream = Flowable.fromArray(
new SuccessfulObject(true, 0),
new SuccessfulObject(false, 1),
new SuccessfulObject(true, 2));
stream = stream.doOnEach(System.out::println);
Flowable<SuccessfulObject> successful = stream.filter(SuccessfulObject::isSuccess);
Flowable<SuccessfulObject> failed = stream.filter(SuccessfulObject::isFail);
successful.doOnEach(successfulObject -> {/*handle success*/}).subscribe();
failed.doOnEach(successfulObject -> {/*handle fail*/}).subscribe();
クラス:
class SuccessfulObject {
private boolean success;
private int id;
public SuccessfulObject(boolean success, int id) {
this.success = success;
this.id = id;
}
public boolean isSuccess() {
return success;
}
public boolean isFail() {
return !success;
}
public void setSuccess(boolean success) {
this.success = success;
}
@Override
public String toString() {
return "SuccessfulObject{" +
"id=" + id +
'}';
}
}
をしかし、私は一度だけ分割する前に、すべての操作を実行したいのに対し、このコードは二回、すべての要素を出力します。
出力:
OnNextNotification [SuccessfulObject {ID = 0}]
OnNextNotification [SuccessfulObject {ID = 1}]
OnNextNotification [SuccessfulObject {ID = 2}]
OnCompleteNotification
OnNextNotification [SuccessfulObject { ID = 0}]
OnNextNotification [SuccessfulObject {ID = 1}]
OnNextNotification [SuccessfulObject {ID = 2}]
OnCompleteNotification
この動作を受け取るためにストリームをどのように処理できますか?
を使用すると、1つのストリーム(?フォーク参加行動) –
いいえ、ちょうどスプリットストリームに戻って一緒に扱うの結果をマージして実行しますかすべての操作を個別に行います。 –
さて、@akarnokdのソリューションを使用してください。サイドノードとして:rx-pipelineに可変オブジェクトを使用しないでください。また、isFuccessはfalsにそれが失敗したことを意味するので、isFailは必要ではありません。 –