2016-10-19 8 views
0

私は接続可能なオブザーバーが複数の加入者を持っています。接続可能な観測者:すべての加入者の終了を観察

各サブスクライバは、いくつかのビジネスロジックを計算します。例えば、ある加入者は、すべてのonNextコールでデータベースに結果を格納し、他の加入者は、その結果をメモリに蓄積し、onCompletedが呼び出されてファイルに書き込まれます。彼らがいつ仕事を終えたかを知りたいので、他のことをやり遂げることができます(接続可能な他のオブザーバーとの集約、データベースからの出力データの読み込みなど)。

これは私が終了を観察している方法です。サブスクライバはオブザーバと同じスレッドで実行されるため、これはうまく動作します。 observeTerminationが呼び出されたときに

public Observable<Boolean> observeTermination() { 
    return Observable.defer(() -> { 
     try { 
      start(); 
      return Observable.just(true); 
     } catch (RuntimeException e) { 
      return Observable.just(false); 
     } 
    }); 
} 

void start() { 
    Observable<List<Foo>> fooBatchReaderObservable = fooBatchReader.createObservable(BATCH_SIZE); 

    ConnectableObservable<List<Foo>> connectableObservable = fooBatchReaderObservable.publish(); 
    subscribers.forEach(s -> connectableObservable.subscribe(s)); 

    connectableObservable.connect(); 
} 

は、だから私はstart方法でロジックを実行する必要はありませんが、誰かがそれに加入している場合のみです。 観測を改善する方法はありますか?

まあまあです。問題は、どこかで観測可能なconnectを呼び出す必要があり、終了の結果としてbooleanの結果も返す必要があるということです。

答えて

0

正解ではありませんが、正しく説明するためのスペースが必要です。サブスクライバの代わりにオブザーバブルを扱うことができればはるかに簡単です。それはそれらを構成する際にはるかに柔軟性があります。

あなたはcomponents持っていることを考える:実際には

Collection<Function<Observable<T>, Observable<?>>> components: 
Observable<T> tObs = ... .publish().autoConnect(components.size()); 
Observable 
.from(components) 
.flatMap(component -> component.apply(tObs)) 
.ignoreElements() 
.doOnTerminate() // or .defaultIfEmpty(...), or .switchIfEmpty(...) 
.subscribe(...); 

を、私はそれが他の部分で組成物に使用可能で聞かせて、あなたも、ここでのすべてのサブスクライブだけで、観察を作成し、それを返すべきではないと言うだろうあなたのコードの。

+0

私は同じデータに対して呼び出される普通の関数に加入者を変更することを考えましたが、これはその使用を 'onNext'に限定しています。したがって、onStart、onCompleted、onErrorに残っているサブスクライバのロジックを実行する方法はありません – marknorkin

+0

'Subsription'sと' isUnsubscribe'メソッドを見ることができますか?それを行う方法はありますか?編集:しかし、まだ 'connect'メソッドに問題があります – marknorkin

+0

あなたが観測可能なサブスクライバ関数を作成し、入力に基づいて別の観測可能な連鎖を生成すると、エラー、onNext、処理を開始して終了できます。 –

関連する問題