あなたは(つまり、すべてのアクションは税込と仮定。スタート/エンドアクションは同じソースから来る)これを行うにはswitchMap
とfilter
の組み合わせを使用することができ
あなたが開始/終了アクションはから来ている場合別のソースを使用すると、さらに簡単です。ソースストリームを分離するステップをスキップできます。
下記のコード例を実行して実際の動作を確認してください。
// this would be your source
const actions$ = new Rx.Subject();
// in this example controllActions and dataActions are derived from the same stream,
// if you have the chance to use 2 seperate channels from the start, do that
const controllActions$ = actions$
.filter(action => action.type === "END" || action.type === "START");
const dataActions$ = actions$
.filter(action => action.type !== "END" && action.type !== "START");
const epic$ = controllActions$
.switchMap(action => {
if (action.type === "END") {
console.info("Pausing stream");
return Rx.Observable.never();
} else {
console.info("Starting/Resuming stream");
return dataActions$;
}
});
epic$.subscribe(console.log);
// simulating some action emissions, the code below is _not_ relevant for the actual implementation
Rx.Observable.from([
"Some data, that will not be emitted...",
{type: "START"},
"Some data, that _will_ be emitted...",
"Some more data, that _will_ be emitted...",
{type: "END"},
"Some data, that will not be emitted...",
"Some data, that will not be emitted...",
{type: "START"},
"Some data, that _will_ be emitted...",
"Some more data, that _will_ be emitted..."
])
.concatMap(d => Rx.Observable.of(d).delay(400))
.subscribe(actions$);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>