2017-05-04 7 views
0

ストリーム上でいくつかの操作を実行し、ストリームを2つのストリームに分割して別々に処理したいとします。RxJava:Split Rx複数のストリームにフロー可能

例問題表示する:

Flowable<SuccessfulObject> stream = Flowable.fromArray(
     new SuccessfulObject(true, 0), 
     new SuccessfulObject(false, 1), 
     new SuccessfulObject(true, 2)); 

stream = stream.doOnEach(System.out::println); 

Flowable<SuccessfulObject> successful = stream.filter(SuccessfulObject::isSuccess); 
Flowable<SuccessfulObject> failed = stream.filter(SuccessfulObject::isFail); 

successful.doOnEach(successfulObject -> {/*handle success*/}).subscribe(); 
failed.doOnEach(successfulObject -> {/*handle fail*/}).subscribe(); 

クラス:

class SuccessfulObject { 
    private boolean success; 
    private int id; 

    public SuccessfulObject(boolean success, int id) { 
     this.success = success; 
     this.id = id; 
    } 

    public boolean isSuccess() { 
     return success; 
    } 
    public boolean isFail() { 
     return !success; 
    } 

    public void setSuccess(boolean success) { 
     this.success = success; 
    } 

    @Override 
    public String toString() { 
     return "SuccessfulObject{" + 
       "id=" + id + 
       '}'; 
    } 
} 

をしかし、私は一度だけ分割する前に、すべての操作を実行したいのに対し、このコードは二回、すべての要素を出力します。

出力:

OnNextNotification [SuccessfulObject {ID = 0}]
OnNextNotification [SuccessfulObject {ID = 1}]
OnNextNotification [SuccessfulObject {ID = 2}]
OnCompleteNotification
OnNextNotification [SuccessfulObject { ID = 0}]
OnNextNotification [SuccessfulObject {ID = 1}]
OnNextNotification [SuccessfulObject {ID = 2}]
OnCompleteNotification

この動作を受け取るためにストリームをどのように処理できますか?

+0

を使用すると、1つのストリーム(?フォーク参加行動) –

+0

いいえ、ちょうどスプリットストリームに戻って一緒に扱うの結果をマージして実行しますかすべての操作を個別に行います。 –

+1

さて、@akarnokdのソリューションを使用してください。サイドノードとして:rx-pipelineに可変オブジェクトを使用しないでください。また、isFuccessはfalsにそれが失敗したことを意味するので、isFailは必要ではありません。 –

答えて

3

使用publishソースへのサブスクリプションを共有する:

Flowable<Integer> source = Flowable.range(1, 5); 

ConnectableFlowable<Integer> cf = source.publish(); 

cf.filter(v -> v % 2 == 0).subscribe(v -> System.out.println("Even: " + v)); 

cf.filter(v -> v % 2 != 0).subscribe(v -> System.out.println("Odd: " + v)); 

cf.connect(); 
+0

私が探していたもの、感謝! –

関連する問題