2017-08-30 13 views
1

RXJSで約束のリストをどのようにチェーン化できますか?すべての約束は、前の問題が解決されたときに実行される必要があります(仕事はステートフルです)。私はこのような何かを考えていたRXJSで約束のリストを連鎖する方法は?

const workTodo = []; // an array of work 
const allWork = Observable.create(observer => { 
    const next=() => { 
    const currentTodo = workTodo.shift(); 
    if (currentTodo) { 
     doTodoAsync(currentTodo) 
     .then(result => observer.onNext(result)) 
     .then(next); 
    } else { 
     observer.onCompleted(); 
    } 
    }; 
    next(); 
}); 

私は今それをやっている方法は、原始的な感じ

const workTodo = []; // an array of work 
const allWork = Observable 
        .fromArray(workTodo) 
        .flatMap(doTodoAsync); 

をしかし、それは基本的に一度にすべての約束を実行します。

答えて

0

再帰についてはどうですか?

まず再帰関数を作成し、recursiveDoToDoそれを呼び出す:

const recursiveDoToDo = (currentTodo, index) => 
    Observable 
     .fromPromise(doTodoAsync(currentTodo)) 
     .map(resolved => ({resolved, index})); 

コードを上だけで観察可能にあなたのdoTodoAsyncをラップして、私たちが解決を約束を返すように結果をマップしindex配列、後で再帰的に使用する。

次に、.expand()演算子を使用してrecursiveDoToDoを再帰的に呼び出します。

recursiveDoToDo(worktodo[0], 0) 
    .expand(res => recursiveDoToDo(worktodo[res.index + 1], res.index + 1)) 
    .take(worktodo.length) 

長さである、あなたの再帰を行うために必要なすべての.expand()を再帰的に永遠に実行されますのでちょうど1によってインデックスをインクリメントすることで、.take()オペレータは、ストリームを終了するときに観察を伝えることがあり、あなたのworktodo

今、あなたは、単にそれを購読することができます:

recursion.subscribe(x => console.log(x)); 

をここではあなたがあなたの試みとかなり接近していたようだworking JS Bin

+0

動作しているようですが、それはまた、原始的なようです。私は私の再帰呼び出しが良いと思う。私はこれを行う簡単な演算子が必要なように感じる。 – nicojs

1

です。

あなたは同じよう.flatMapのために1の最大同時実行数を指定することができ、次のいずれか

Observable.fromArray(workTodo) 
.flatMap(doTodoAsync, 1) 

または同等.concatMapの代わり.flatMapを使用します。

Observable.fromArray(workTodo) 
.concatMap(doTodoAsync) 

それはより多くの慣用感じているように私はconcatMapを使用します。

UPDATE:DEMO

+0

私は 'concatMap'をテストしましたが、少し違うことをしているようです。 'doTodoAsync'を並列に実行し、結果を元の順序で結合します。ここでは、http://reactivex.io/documentation/operators/flatmap.htmlについても説明します。 flatMapの第2パラメータは 'number'として存在しないようです。 – nicojs

+0

@nicojsあなたが正確に何を意味するかわかりません。実行中のデモで答えを更新しました。これは、タスクが次々に実行されることを明確に示し、 'mergeMap/flatMap'はここに記述されているように' concurrency'パラメータを受け入れます:http://reactivex.io/rxjs/class/es6/Observable。js〜Observable.html#instance-method-mergeMap。 –

関連する問題