2017-08-03 20 views
2

私は無限のオブジェクトストリームを持っています。そして、私の要件は、同じキーを持つ観測可能なストリームのすべてのアイテムを同期して処理し、異なるキーを持つ他のすべてのアイテムを並行して処理する必要があることです。Rx.NET GroupByUntilグループの終了、スレッドの完了を待ちます

var results = observableStream 
    .GroupByUntil(item => item.Id, group => 
     group.Throttle(TimeSpan.FromSeconds(30), scheduler)) 
    .SelectMany(group => 
     group 
      .ObserveOn(scheduler) 
      .Select(item => ProcessItem(item))); 

var disposable = results.Subscribe(result => SaveResults(result)); 

私はProcessItem(item)の実行が30秒未満かかり保証できるまで、コードがうまく機能:それを行うための最も簡単な方法は、(ほとんどの場所で述べたように)GroupByUntil演算子を使用することです。それ以外の場合はgroup.Throttle(TimeSpan.FromSeconds(30), scheduler)がグループのストリームを閉じ、新しいアイテムが到着して新しいスレッドで処理を開始する可能性が非常に高くなります。

基本的に私のスレッドは特定のキーを持つすべてのアイテムの処理を完了しており、durationSelectorGroupByUntilというオペレータパラメータで通知する必要があります。

これを達成するためのアイデアはありますか?前もって感謝します。

+2

特定のキーの最後のキーを受け取ったことをどのように知っていますか? – NetMage

+0

@NetMage実際、私は知りません。私が達成しようとしているのは、特定のグループを処理しているスレッドがジョブを完了し、キューに何もない場合にのみ、絞り込み(デバウンス)を開始する必要があるということです。 – Azat

+0

'ProcessItem'は同期していますか?それは 'async'ですか? 'IObservable 'を返しますか? – Shlomo

答えて

2

これは、この質問に非常によく似ています。A way to push buffered events in even intervalsです。

public static class ObservableDrainExtensions 
{ 
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
     Func<TSource, IObservable<TOut>> selector) 
    { 
     return Observable.Defer(() => 
     { 
      BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 

      return source 
       .Zip(queue, (v, q) => v) 
       .SelectMany(v => selector(v) 
        .Do(_ => { },() => queue.OnNext(new Unit())) 
       ); 
     }); 
    } 
} 

オペレータ、あなたの問題は非常に簡単になることを考える:

フォームその質問への答えは、オペレータDrainがあります

var results = observableStream 
    .GroupBy(item => item.Id) 
    .SelectMany(group => 
     group 
      .ObserveOn(scheduler) 
      .Drain(item => ProcessItem(item))); 

var disposable = results.Subscribe(result => SaveResults(result)); 

はA1、A2のように見えるの流れを考えると、 B1、A3、B2、C1、B3、C2,GroupByはIDでストリームを区切る。

A: A1, A2, A3 
B: B1, B2, B3 
C: C1, C2 

...およびDrainは、指定されたサブストリーム内の項目に対して、パラレルではなくシリアルで実行するようにします。

+0

良い解決策ですが、 'GroupBy'だけを使ってグループを破壊することはありません。膨大な数の一意のキーがあれば、メモリが足りなくなるかもしれません。 – Azat

関連する問題