私はあなたがRxについていくつか誤解していると思います。オブザーバブルにはフィルタがなく、あなたは '生きていない'というフィルタを追加して削除します。また、誰が購読しているかに基づいてデータを転送することもありません。
代わりに、コールチェーンを構築します。あなたは、addRecord
のものとremoveRecord
イベントのもののように、観測可能なソースから始めます。次に、これらの観測値を連鎖させて、Rx内のさまざまな観測点から新しいoperatorsを作成し、最終的に観測可能な観測値に登録します。サブスクリプションはチェーン全体をアクティブにし、ソースイベントが発生するとすべての演算子がトリガーされ、最終的にイベントはフィルタリングされていない場合はsubscribe
になります。
実際にあなたがRxで説明したことを行うことができます。たとえば、オブザーバブル上のフィルタを変更することは、シーケンスを別のシーケンスに投影し、毎回新しいシーケンスに切り替えるようにするオペレータのswitchMap
で比較的簡単に実行できます。例えば、filterSource.switchMap(filterFunction => Obs-1.filter(filterFunction))
。これより簡単な場合でも、最初のサブスクリプションの登録を解除し、Rxチェーンを再度設定することができます。しかし、関数のビルドを使用すると、多くのジャグリング状態が外れます。
しかし、実際にはこのような複雑な動作は必要ないと強く思っています。あなたが望むものは、次のように単純にアーカイブすることができます:
var Src-1 = fromEvent(dataSource, 'addRecord') // create the first source
var Src-2 = fromEvent(dataSource, 'removeRecord') // and the other source
var Obs-1 = Src-1.combineLatest(Src-2) // combine both sources
.filter(e => someCondition(e)) // filter the source
var Obs-2 = Obs-1.mergeMap(e => someOtherCondition(e) ? Change(e) : Rx.Observable.of(e)) // on someOtherCondition, either transform the source with the `Change(e)` function. Or keep it unchanged with `of(e)`
var Obs-3 = Obs-2.filter(e => anotherCondition(e)) // Filter again
var sub = Obs-3.subscribe() // activate the sequence.
私を訂正してくれてありがとう。各オブザーバ(OBS-1、OBS-2など)は、データを受信する独自のサブスクリプションを持つことができます。実際、オブザーバは動的に追加されます。これを行うにはどうすればいいですか? – user3130446
どのように動的に観測値を受信しますか?定期購読を意味しないのですか?私は、あなたが望むものの中でより具体的にする必要があると思っています。 Rxはリアクティブプログラミングに関するものであり、リアクティブプログラミングでは、データがどこから来ているかを理解することが重要です。ソースがクリアされると、残りの部分が落ち込み、間違った解決策を探しているようです。 – Dorus