2016-09-13 10 views
0

RXエンティティを使用したとき、私は次のスキームを持っています。必要な加入者がすべて仕事を終えたかどうかを確認するには?

Observerable1 
     | 
------------------ 
|  |   | 
S1(*) S2(*)  S3 (onCompleted() starts another observable) 
        | 
       Observable2 
        | 
       S4(*) 

私は*と加入者のすべてが(S1,S2,S4)自分の仕事を終えたときに知っておく必要があります。彼らは別のスレッドで実行することができるので、私はいくつかの同期メカニズムが必要ですか?rx-javaからいくつかのボックスソリューションがありますか?

現在の設計説明するためのいくつかのサンプルコード:S3で

@Component 
public class BatchReader { 
    @Autowired List<Subscriber<List<Integer>>> subscribers; 

    public void start() { 
     ConnectableObservable<List<Integer>> observable1 = createObservable(); 
     subscribers.forEach(observable1::subscribe); 
     observable1.connect(); 
    } 

    private ConnectableObservable<List<Integer>> createObservable() { 
     return Observable.create((Subscriber<? super List<Integer>> subscriber) -> { 
      try {  
       subscriber.onStart(); 

       while (someCondition) { 
        List<Integer> numbers = ...; 
        subscriber.onNext(numbers); 
       } 

       subscriber.onCompleted(); 
      } catch (Exception ex) { 
       subscriber.onError(ex); 
      } 
     }).observeOn(Schedulers.newThread()).publish(); 
    } 
} 

を私はロジック次き:

@Component 
public class S3 extends Subscriber<List<Integer>> { 
    @Autowired AnotherBatchReader anotherBatchReader; 
    .... 
    @Override 
    public void onCompleted() { 
     anotherBatchReader.start(); 
    } 
    ... 
} 

を、S4はAnotherBatchReaderに加入する:

@Component 
public class AnotherBatchReader { 
    @Autowired S4<List<Foo>> subscriber4; 

    public void start() { 
     Observable<List<Foo>> observable2 = createObservable(); 
     observable2.subscribe(subscriber4); 
    } 

    private Observable<List<Foo>> createObservable() { 
     return Observable.create(subscriber -> { 
      try { 
        subscriber.onStart(); 
        while (someConditionBar) { 
         List<Foo> foo = ...; 
         subscriber.onNext(foo); 
        } 
        subscriber.onCompleted(); 
       } 
      } catch (RuntimeException ex) { 
       subscriber.onError(ex); 
      } 
     }); 
    } 
} 

のであり私が関心を持っているすべての加入者が自分の仕事をしたときに適切に通知される方法? rxはそれをボックス外でサポートしていますか?それとも、それをサポートするより優れたデザインがありますか?

編集: 私は別個の加入者を持っています。最後に*(S1、S2、S3)の加入者は、データをxmlファイルに書き出します。しかし

  • S1は、いくつかの仕事をして、onNext()でデータを受信し、ファイルに直接結果を書き込み
  • S2は、いくつかの仕事をして、onNext()でデータを受信したフィールドで結果を蓄積し、その後、onCompleted
  • S3でそれを書き込みonNextでデータを受信し、何らかの作業を行い、結果をDBに書き込んだ後、onCompletedが呼び出され、別の観測データが開始され、S4からデータが取得されます。
  • S4はonNext()のデータを受け取り、ファイル

onNext()で受信したデータから生成される結果は一意である必要があるため、私はS3でDBにデータを書き込む必要がある理由はあるが、私はObservable1からバッチでデータを取得していますように私はすることができます」この一意性を保証するので、DBはそれを世話します。

そしてもちろんS3 S3に存在する結果の乗算はS2と比べて重要なので、私はS2と同じことをすることはできません(すべての結果がメモリに蓄積されます)。

+0

...我々はすべてそれを行ってきた - これは、XYの問題の場合は、あなたのユースケースは何ですか?なぜあなたは購読者を分けたのですか? –

+0

@TassosBassoukos私はあなたの答えに質問を編集します – marknorkin

+0

説明をありがとう。 –

答えて

1

ありがとうございます。既存の演算子を慎重に適用すると、コードが最小限に抑えられます。今、私はすべての詳細を持っていますが、何をやっていることは、このようにたくさん感じていない:もちろん

Observable<T> observable1 = ... 
     .share(); 

Observable<?> S1 = S1(observable1); 
Observable<?> S2 = S2(observable1); 
Observable<?> S3 = S3(observable1); 
Observable<?> S4 = defer(() -> readFromDatabase()).compose(S4::generate); 

Observable.merge(S1,S2,S3.ignoreElements().switchIfEmpty(S4)) 
    .ignoreElements() 
    .onComplete(...) 
    .subscribe(); 

を、詳細はオリジナルの観測可能に高温または低温であるかどうかに応じて異なることになり、 S[1-4]の詳細です。

また、すべてSubscribersをあなた自身のために動かそうとしないでください。そうすれば、フレームワークがあなたのためにそれを行うようにしてください。:

S4 = Observable.create(SyncOnSubscribe.generateStateless(
     observer -> observer.onNext(<get List<Foo>>) 
    )) 
    .takeWhile(list -> someConditionBar); 

編集: - これは、メーリングリストの、あまりにも複雑に見えます

関連する問題