2017-10-10 5 views
0

Iが観察myObservable有する:concatMap()等価が、非同期()

let myObservable = Observable.of(2000, 1000) 

concatMap()で:合計時間= 3000ミリ秒、元の順序で結果。 mergeMap()

myObservable.concatMap(v => Rx.Observable.of(v).delay(v)) 
// concatMap: 2000, concatMap: 1000 

:TOTAL TIME = 2000ミリ秒は、元の順序ではない結果。

myObservable.mergeMap(v => Rx.Observable.of(v).delay(v)) 
// mergeMap: 1000, mergeMap: 2000 

私はconcatMapのように元の順序で結果を取得する方法をしたいが、それぞれが代わりに完了するために、次のネストされた、観察を待つので非同期に観測可能なネストされた呼び出し:

// --- The behavior that I want --- 
myObservable.myCustomMap(v => Rx.Observable.of(v).delay(v)) 
// myCustomMap: 2000, myCustomMap: 1000 
// TOTAL TIME = 2000 millis 

がありますエレガントなソリューション?

編集:私はソース(myObservable)がこの特定の同期ケースだけでなく、非同期でも動作するソリューションを探しています。

+0

これには 'forkJoin'を使うべきです。適切な答えを書いてください。 – Maxime

+0

ソース 'myObservable'が非同期に放出されると思いますか?私は、ユースケースがObservable.of(2000、1000) 'よりも複雑であることを意味していますか? – martin

+0

@martinはい、ユースケースはより複雑で潜在的に非同期です – PerrierCitror

答えて

0

すべての観測値を同時に発生させるには、forkJoinを使用する必要があります。ここで

はコメントなしの例です:

const { Observable } = Rx; 

const obs$ = Observable 
    .of(3000, 3000, 1000) 
    .map(x => Observable.of(x).delay(x)); 

const allObsWithDelay$ = obs$.toArray(); 

const result$ = allObsWithDelay$ 
    .switchMap(arr => Observable.forkJoin(arr)); 

result$ 
    .do(console.log) 
    .subscribe(); 

そして説明と同じ:それらの値で

const { Observable } = Rx; 

// source observable, emitting simple values... 
const obs$ = Observable 
    .of(3000, 3000, 1000) 
    // ... which are wrapped into a different observable and delayed 
    .map(x => Observable.of(x).delay(x)); 

// use a reduce to build an array containing all the observables 
const allObsWithDelay$ = obs$.toArray(); 

const result$ = allObsWithDelay$ 
    // when we receive the array with all the observable 
    // (so we get one event, with an array of multiple observables) 
    .switchMap(arr => 

    // launch every observable into this array at the same time 
    Observable.forkJoin(arr) 
); 

// display the result 
result$ 
    .do(console.log) 
    .subscribe(); 

3000, 3000, 1000彼らは「として全体のプロセスは3秒(それらの最大値を取っています同時に発射される)

Working Plunkr:https://plnkr.co/edit/IRgEhdjCmZSTc6hSaVeF?p=preview

編集1:scanよりも優れているtoArrayを指摘し@PierreCitrorに感謝:)

+0

ニースマキシム、toArray()を呼び出すことでreduceを単純化することができます – PerrierCitror

+0

これは私が探していましたが、覚えていなかったことです@PerrierCitror:D – Maxime

+0

さて、私はこれが正確にOPが探しているものであるとは確信していません。 'obs $'が非同期の場合、 'switchMap'は全てのObservableを購読解除して再度購読します。たとえば、5つのリモート呼び出しがあり、 'obs $ 'が6番目のものを出力すると、前の5つのすべてが繰り返されます。 – martin

1

私はこのようにそれを行うだろう:

myObservable 
    .mergeMap((val, i) => Observable.forkJoin(
    Observable.of(i), 
    Observable.of(v).delay(v) 
)) 
    .scan((acc, ([i, result])) => { 
    acc[i] = result; 
    return acc; 
    }, {}) 
    .filter(allResults => { 
    // Whatever goes here 
    Object.keys(allResults) // list indices of all finished responses 
    }) 

これはどこ各単一のオブジェクト内のすべての応答を蓄積します応答には、mergeMapに到着したインデックスが割り当てられます。

filterでは、現在の状態をさらに伝播する必要があるかどうかを決定するロジックを書くことができます(たとえば、一定数の応答が到着するまで待ちます)。