maxEntriesの最大値までサーバーへの呼び出しをバンドルしようとしていますが、maxWait msよりも長く待機したくありません。これは以前はRxJS 4ではwindowWithTimeOrCount()
として利用可能でしたが、RxJS 5では削除されました。RxJS - maxWaitとmaxElementsウィンドウのwindowWhen()を使用
ウィンドウの最後の要素が失われていることを除いて、すべてがうまくいっています。そして、「失われた」と言えば、それは今私が感じる方法です。私が間違っていることを教えてくれるRxJSの達人? (いないウィンドウに)失われtoggleSubject.next(null)
if (count > (maxEntries))
によるトリガ
private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {
// We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
// but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
// complete after maxEntries/maxWait (whatever comes first).
const toggleSubject = new Subject<void>();
return queue
// Start emitting a new Observable every time toggleSubject emits.
// (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
// complete collection)
.windowWhen(() => toggleSubject)
// map() is called once for every window (maxEntries/maxWait)
// the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
// count all elements, then emitting on toggleSubject, triggering a new Observable.
// (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed
// and the hooked up version with the inner do() would never be called.)
.map((obs) => {
// counts the number of cacheEntries already in this stream
let count = 0;
// flag to kill the timeout callback
let done = false;
// we have to return an Observable
return obs.do(() => {
count++;
if (count === 1) {
// we start counting when the first element is streamed.
IntervalObservable.create(maxWait).first().subscribe(() => {
if (!done) {
//trigger due to maxWait
toggleSubject.next(null);
}
});
}
if (count > (maxEntries)) {
done = true;
// trigger due due to maxEntries(' + maxEntries + ')');
toggleSubject.next(null);
}
}
);
});
}
素子。
EDIT:maxTimeは、新しいObservableの最初の要素がプッシュされる瞬間を開始します。 if (count === 1)
。これはa)私が窓口Observablesの内部からmap()
で作業している理由とb)重要です。なぜなら、これは必須の動作だからです。
例:maxElements:100、maxWait:100. 101要素はt = 99でプッシュされます。期待される挙動:t = 99で、100要素のObservableがプッシュされます。 1要素が残されます。カウンタ+タイマリセット。 t = 199で、第2の「チャンク」のカウンタが期限切れとなり、Observableを1要素でプッシュします。
(この例ではBrandons(あろう回答)コードを参照して - 私はそれを正しく読み取る場合 - 一つの元素で、T = 100で、100個の要素以降1ミリ秒とT = 99で観測する観測を押します。 )
Cool。ありがとう。私のアプローチはやや複雑だった... – RAlfoeldi
うーん...いいよね、でもトリックはしないよ。 'publish() 'Observableが返され、Observable >が返されます。 私はあなたのコードを読んでいる方法、タイマーは独立して(ストリームに何が起こるかにかかわらずすべてのmaxWait ms)トリガーします。それが私が 'windowWhen()'の内部からフィードバックを得ようとしている理由です。最初の要素がいつ来るかを数え始めます。 (op editを参照してください) –
RAlfoeldi
このオーバーロードは 'publish(func :(Observable)=> Observable です):Observable 'です。つまり、publish()は、あなたが提供するファクトリメソッドが返す任意のタイプのオブザーバブルを返します。この場合、Observable >を返します。 –
Brandon