2017-05-12 11 views
1

私は観測可能なオブザーバブルを持っています。ここでは、内側のオブザーバブルはそれぞれ計算コストの高い値を1つ生成します。私は(SwitchFrugal?)Switchのような振る舞いをしたいが、何の作業が無駄にならない場所:作業を浪費しないスイッチ

  • に加入している現在の内部に観察がある場合、私はそれが
      を完了するまで、それから値を受けておきたいです
    • それは同様でSwitch
  • から解除すべきではない、現在の内部に観察完了すると、最新のインナーに観察
  • に加入する必要があります(現在の後のいずれかが存在する場合) 10

私は本当にこの動作を実装するのに苦労しています。これは既存の演算子を使用して実行可能ですか?または、これはObservable.Createを介して「最初から」行う必要がありますか?

Marble Diagram

+0

ではない[ 'concatMap()'](http://reactivex.io/rxjs/class/es6/Observable.js 〜Observable.html#instance-method-con catMap)または['exhaustMap()'](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-exhaustMap)あなたが探しているものは? – martin

+0

@martin 'exhaust' /' exhaustMap'は、第2の箇条書きの点を満たしていないことを除いて、ほとんど私が望む 'SwitchFirst'と同じです:"一度現在の内部観測が完了すると、現在の後にあるもの)は " –

+0

"に登録する必要がありますあなたが探しているものの大理石図を追加できますか? @DanielT。 –

答えて

1

私はこのようにそれを行うだろう:私が観測を発する観察可能なソースをシミュレートしてい

const subject = new Subject(); 
const start = Scheduler.async.now(); 

// Simulated source Observable 
const source = Observable 
    .timer(0, 900) 
    .map(i => Observable.defer(() => { 
     console.log('Subscribe inner Observable:', i); 
     return Observable 
      .timer(0, 400) 
      .take(6) 
      .map(j => i.toString() + ':' + j.toString()); 
    })) 
    .share(); 

source 
    .merge(subject 
     .withLatestFrom(source) 
     .map(([v, observable]) => { 
      return observable; 
     }) 
    ) 
    .exhaustMap(obs => obs.finally(() => subject.next())) 
    .subscribe(val => console.log(Scheduler.async.now() - start, val)); 

。外側のオブザーバブルは内側のオブザーバブルより速く放出されるため、これはあなたの状況をシミュレートする必要があります。

次にsubjectが出るときだけsourceをチェーンにマージします。件名はfinallyオペレータでトリガされます。次のように

出力は第一インナー観測ソースから最新の発光を完了したときi === 2で観察され

Subscribe inner Observable: 0 
45 '0:0' 
452 '0:1' 
856 '0:2' 
1260 '0:3' 
1665 '0:4' 
2065 '0:5' 
Subscribe inner Observable: 2 
2069 '2:0' 
2472 '2:1' 
2876 '2:2' 
3280 '2:3' 
3683 '2:4' 
4084 '2:5' 
Subscribe inner Observable: 4 
4086 '4:0' 
4487 '4:1' 

注意こと。あなたがこれを比較すると

2065 '0:5' 
Subscribe inner Observable: 2 
2069 '2:0' 

:あなたはこのコードを実行する場合は、これらの3つの排出量(jsbinが今壊れているので、私は、ログインしてデモをすることはできません)との間には時間差がありませんことがわかります

source 
    .exhaustMap(obs => obs.finally(() => subject.next())) 
    .subscribe(console.log); 

これは次のように出力されますmerge()あなたがexhaustMapニーズソースから別の発光がありますされるまで待つことがわかりますせずにデフォルトの動作。時間のギャップに注目し、それがi === 3代わりの2で内部観察可能に加入していること:

Subscribe inner Observable: 0 
45 '0:0' 
449 '0:1' 
853 '0:2' 
1257 '0:3' 
1659 '0:4' 
2064 '0:5' 
Subscribe inner Observable: 3 
2748 '3:0' 
3151 '3:1' 
3553 '3:2' 
3953 '3:3' 
4355 '3:4' 
4759 '3:5' 
Subscribe inner Observable: 6 
5458 '6:0' 
5863 '6:1' 

編集:

観察可能なインナー同じ二回に加入(これらは冷たい観測されていると仮定して)私がすることができます回避するために、

私はランダムな間隔で以下の値を放射するようにソースを作ります:

0123私はすでに購読している観測可能なインデックスと次に何が必要なインデックスを追跡します。

source 
    .merge(subject 
     .withLatestFrom(source) 
     .map(([processedIndex, observableAndIndex]) => { 
      let observableIndex = observableAndIndex[1]; 
      if (processedIndex < observableIndex) { 
       return observableAndIndex; 
      } 
      return false; 
     }) 
     .filter(Boolean) 
    ) 
    .exhaustMap(([observable, index]) => observable.finally(() => subject.next(index))) 
    .subscribe(val => console.log(Scheduler.async.now() - start, val)); 

出力は非常に似ていますが、以前の観察可能完了は非常にすぐに我々が購読していない場合でも:

const source = Observable.range(0, 100, Scheduler.async) 
    .concatMap(i => Observable.of(i).delay(Math.random() * 3000)) 
    .map(i => Observable.defer(() => { 
     console.log('Subscribe inner Observable:', i); 
     return Observable 
      .timer(0, 400) 
      .take(4) 
      .map(j => i.toString() + ':' + j.toString()); 
    })) 
    .map((observable, index) => [observable, index]) 
    .share(); 

はその後、我々はsubject.next()を使用して処理し、単に私たちは望んでいないの観測を無視インデックスを送信しますそれを再度(すなわち、例えば観測12間の時間ギャップ'S):

Subscribe inner Observable: 0 
2803 '0:0' 
3208 '0:1' 
3615 '0:2' 
4016 '0:3' 
Subscribe inner Observable: 1 
4853 '1:0' 
5254 '1:1' 
5658 '1:2' 
6061 '1:3' 
Subscribe inner Observable: 2 
7814 '2:0' 
8218 '2:1' 
8622 '2:2' 
9026 '2:3' 
Subscribe inner Observable: 3 
9180 '3:0' 
9583 '3:1' 
9987 '3:2' 
10391 '3:3' 
Subscribe inner Observable: 5 
10393 '5:0' 
10796 '5:1' 
+1

これはとても感謝しています。私は唯一の問題は、内側の観測可能物が完成し、より新しい内側の観測物が到着しなければ、同じ観察可能物が再度予約購読されるということです。ソースオブザーバブルが単一のオブザーバブルを放出し、それから完了しない場合、同じインナーオブザーバブルに何度も繰り返し購読し続けるのではないでしょうか? (私は月曜日までこれを試すことができません。) –

+0

@TimothyShieldsそれは寒いObservablesにも当てはまります(あなたが熱いObservableを購読していれば、何もしません)。私の更新を見れば、あなたがすでに購読しているインデックスを追跡して、望んでいないObservablesを無視することができます。 – martin

関連する問題