私は無限のオブジェクトストリームを持っています。そして、私の要件は、同じキーを持つ観測可能なストリームのすべてのアイテムを同期して処理し、異なるキーを持つ他のすべてのアイテムを並行して処理する必要があることです。Rx.NET GroupByUntilグループの終了、スレッドの完了を待ちます
var results = observableStream
.GroupByUntil(item => item.Id, group =>
group.Throttle(TimeSpan.FromSeconds(30), scheduler))
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Select(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
私はProcessItem(item)
の実行が30秒未満かかり保証できるまで、コードがうまく機能:それを行うための最も簡単な方法は、(ほとんどの場所で述べたように)GroupByUntil
演算子を使用することです。それ以外の場合はgroup.Throttle(TimeSpan.FromSeconds(30), scheduler)
がグループのストリームを閉じ、新しいアイテムが到着して新しいスレッドで処理を開始する可能性が非常に高くなります。
基本的に私のスレッドは特定のキーを持つすべてのアイテムの処理を完了しており、durationSelector
のGroupByUntil
というオペレータパラメータで通知する必要があります。
これを達成するためのアイデアはありますか?前もって感謝します。
特定のキーの最後のキーを受け取ったことをどのように知っていますか? – NetMage
@NetMage実際、私は知りません。私が達成しようとしているのは、特定のグループを処理しているスレッドがジョブを完了し、キューに何もない場合にのみ、絞り込み(デバウンス)を開始する必要があるということです。 – Azat
'ProcessItem'は同期していますか?それは 'async'ですか? 'IObservable'を返しますか? –
Shlomo