2016-08-29 1 views
2

maxEntriesの最大値までサーバーへの呼び出しをバンドルしようとしていますが、maxWait msよりも長く待機したくありません。これは以前はRxJS 4ではwindowWithTimeOrCount()として利用可能でしたが、RxJS 5では削除されました。RxJS - maxWaitとmaxElementsウィンドウのwindowWhen()を使用

ウィンドウの最後の要素が失われていることを除いて、すべてがうまくいっています。そして、「失われた」と言えば、それは今私が感じる方法です。私が間違っていることを教えてくれるRxJSの達人? (いないウィンドウに)失われtoggleSubject.next(null)if (count > (maxEntries))によるトリガ

private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> { 

    // We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries 
    // but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that 
    // complete after maxEntries/maxWait (whatever comes first). 
    const toggleSubject = new Subject<void>(); 

    return queue 

    // Start emitting a new Observable every time toggleSubject emits. 
    // (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the 
    // complete collection) 
     .windowWhen(() => toggleSubject) 

     // map() is called once for every window (maxEntries/maxWait) 
     // the inner do() is called for every element in the window, allowing us to set up the timeout callback and to 
     // count all elements, then emitting on toggleSubject, triggering a new Observable. 
     // (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed 
     // and the hooked up version with the inner do() would never be called.) 
     .map((obs) => { 
     // counts the number of cacheEntries already in this stream 
     let count = 0; 
     // flag to kill the timeout callback 
     let done = false; 
     // we have to return an Observable 
     return obs.do(() => { 
      count++; 
      if (count === 1) { 
       // we start counting when the first element is streamed. 
       IntervalObservable.create(maxWait).first().subscribe(() => { 
       if (!done) { 
        //trigger due to maxWait 
        toggleSubject.next(null); 
       } 
       }); 
      } 
      if (count > (maxEntries)) { 
       done = true; 
       // trigger due due to maxEntries(' + maxEntries + ')'); 
       toggleSubject.next(null); 
      } 
      } 
     ); 
     }); 
    } 

素子。

EDIT:maxTimeは、新しいObservableの最初の要素がプッシュされる瞬間を開始します。 if (count === 1)。これはa)私が窓口Observablesの内部からmap()で作業している理由とb)重要です。なぜなら、これは必須の動作だからです。

例:maxElements:100、maxWait:100. 101要素はt = 99でプッシュされます。期待される挙動:t = 99で、100要素のObservableがプッシュされます。 1要素が残されます。カウンタ+タイマリセット。 t = 199で、第2の「チャンク」のカウンタが期限切れとなり、Observableを1要素でプッシュします。

(この例ではBrandons(あろう回答)コードを参照して - 私はそれを正しく読み取る場合 - 一つの元素で、T = 100で、100個の要素以降1ミリ秒とT = 99で観測する観測を押します。 )

答えて

2

ええ、このような副作用のためにmapを使いたくありません。あなたが気づいたように、あなたはアイテムを落とすことになります。

ここでは、あなたが望むことをすると思われる一般的な方法があります。

注:現在、RXJS 5には、このオーバーロードのタイプ定義があるissueが公開されています。 TypeScriptでコンパイルできるような型キャストをいくつか追加しました。

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> { 
    // use publish() so that we can subscribe multiple times to the same stream of data. 
    return queue.publish(entries => { 
     // observable which will trigger after maxWait 
     const timer = IntervalObservable.create(maxWait); 
     // observable which will trigger after maxEntries 
     const limit = entries.take(maxEntries).last(); 
     // observable which will trigger on either condition 
     const endOfWindow = limit.takeUntil(timer); 

     // use endOfWindow to close each window. 
     return entries.windowWhen(() => endOfWindow) as Observable<T>; 
    }) as Observable<Observable<T>>; 
} 

編集:

最初の項目は、各ウィンドウに到着した後、あなたはそうのようにそれを行うことができるまであなたはタイマーが起動しない場合:

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> { 
    // use publish() so that we can subscribe multiple times to the same stream of data. 
    return queue.publish(entries => { 
     // observable which will trigger after maxWait after the first 
     // item in this window arrives: 
     const timer = entries.take(1).delay(maxWait); 
     // observable which will trigger after maxEntries 
     const limit = entries.take(maxEntries).last(); 
     // observable which will trigger on either condition 
     const endOfWindow = limit.takeUntil(timer); 

     // use endOfWindow to close each window. 
     return entries.windowWhen(() => endOfWindow) as Observable<T>; 
    }) as Observable<Observable<T>>; 
} 
+0

Cool。ありがとう。私のアプローチはやや複雑だった... – RAlfoeldi

+0

うーん...いいよね、でもトリックはしないよ。 'publish() 'Observable が返され、Observable >が返されます。 私はあなたのコードを読んでいる方法、タイマーは独立して(ストリームに何が起こるかにかかわらずすべてのmaxWait ms)トリガーします。それが私が 'windowWhen()'の内部からフィードバックを得ようとしている理由です。最初の要素がいつ来るかを数え始めます。 (op editを参照してください) – RAlfoeldi

+0

このオーバーロードは 'publish(func :(Observable )=> Observable です):Observable 'です。つまり、publish()は、あなたが提供するファクトリメソッドが返す任意のタイプのオブザーバブルを返します。この場合、Observable >を返します。 – Brandon

0

ソリューション私は、非同期スケジューラでwindowWhen()をトグルしています。その最後の値を受けてから任意の下流の演算子を防止 -

if (count === (maxEntries)) { 
    done = true; 
    this.LOGGER.debug(' - trigger due due to maxEntries(' + maxEntries + ')'); 
    Rx.Scheduler.async.schedule(()=>toggleSubject.next(null)); 
} 

問題はwindowWhen()はすぐに返された観測を完了したということでした。

ご質問いただきありがとうございます。私はここに投稿する前にRx.Scheduler.asyncなどを試しましたが、どういうわけかそれは動作していないようでした。

+0

非同期呼び出しが実行される前に、すべての項目が現在のウィンドウにプッシュされるため、アイテムがobservableに同時にプッシュされていると、ウィンドウが 'maxEntries'以上のエントリで終了することに注意してください。このような文章作成に伴う複雑さは、Subjectを使用せず、代わりに既存の演算子から演算子を構築するという一般的なアドバイスの理由の1つです。 – Brandon

関連する問題