RXエンティティを使用したとき、私は次のスキームを持っています。必要な加入者がすべて仕事を終えたかどうかを確認するには?
Observerable1
|
------------------
| | |
S1(*) S2(*) S3 (onCompleted() starts another observable)
|
Observable2
|
S4(*)
私は*
と加入者のすべてが(S1,S2,S4
)自分の仕事を終えたときに知っておく必要があります。彼らは別のスレッドで実行することができるので、私はいくつかの同期メカニズムが必要ですか?rx-java
からいくつかのボックスソリューションがありますか?
現在の設計説明するためのいくつかのサンプルコード:S3で
@Component
public class BatchReader {
@Autowired List<Subscriber<List<Integer>>> subscribers;
public void start() {
ConnectableObservable<List<Integer>> observable1 = createObservable();
subscribers.forEach(observable1::subscribe);
observable1.connect();
}
private ConnectableObservable<List<Integer>> createObservable() {
return Observable.create((Subscriber<? super List<Integer>> subscriber) -> {
try {
subscriber.onStart();
while (someCondition) {
List<Integer> numbers = ...;
subscriber.onNext(numbers);
}
subscriber.onCompleted();
} catch (Exception ex) {
subscriber.onError(ex);
}
}).observeOn(Schedulers.newThread()).publish();
}
}
を私はロジック次き:
@Component
public class S3 extends Subscriber<List<Integer>> {
@Autowired AnotherBatchReader anotherBatchReader;
....
@Override
public void onCompleted() {
anotherBatchReader.start();
}
...
}
を、S4はAnotherBatchReaderに加入する:
@Component
public class AnotherBatchReader {
@Autowired S4<List<Foo>> subscriber4;
public void start() {
Observable<List<Foo>> observable2 = createObservable();
observable2.subscribe(subscriber4);
}
private Observable<List<Foo>> createObservable() {
return Observable.create(subscriber -> {
try {
subscriber.onStart();
while (someConditionBar) {
List<Foo> foo = ...;
subscriber.onNext(foo);
}
subscriber.onCompleted();
}
} catch (RuntimeException ex) {
subscriber.onError(ex);
}
});
}
}
のであり私が関心を持っているすべての加入者が自分の仕事をしたときに適切に通知される方法? rxはそれをボックス外でサポートしていますか?それとも、それをサポートするより優れたデザインがありますか?
編集: 私は別個の加入者を持っています。最後に*
(S1、S2、S3)の加入者は、データをxmlファイルに書き出します。しかし
- S1は、いくつかの仕事をして、
onNext()
でデータを受信し、ファイルに直接結果を書き込み - S2は、いくつかの仕事をして、
onNext()
でデータを受信したフィールドで結果を蓄積し、その後、onCompleted
- S3でそれを書き込み
onNext
でデータを受信し、何らかの作業を行い、結果をDBに書き込んだ後、onCompleted
が呼び出され、別の観測データが開始され、S4からデータが取得されます。 - S4は
onNext()
のデータを受け取り、ファイル
onNext()
で受信したデータから生成される結果は一意である必要があるため、私はS3でDBにデータを書き込む必要がある理由はあるが、私はObservable1
からバッチでデータを取得していますように私はすることができます」この一意性を保証するので、DBはそれを世話します。
そしてもちろんS3
S3に存在する結果の乗算はS2と比べて重要なので、私はS2
と同じことをすることはできません(すべての結果がメモリに蓄積されます)。
...我々はすべてそれを行ってきた - これは、XYの問題の場合は、あなたのユースケースは何ですか?なぜあなたは購読者を分けたのですか? –
@TassosBassoukos私はあなたの答えに質問を編集します – marknorkin
説明をありがとう。 –