0

ストリームのフィルタのリストを設定する必要があります。ストリームのカスタマイズ可能なフィルタリング

ストリームには最初はフィルタがありませんが、時間の経過とともにアクティブなフィルタのリストが表示されます。

各フィルタの有効期間は厳密になります。

テストケースは次のとおりです。

var scheduler = new TestScheduler(); 

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0200.Ms(), 'A'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0400.Ms(), 'B'), 
    ReactiveTest.OnNext(0500.Ms(), 'A'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

// filters 
// A between 70ms -> 550ms 
// B between 330ms -> 400ms 
// modeled as a string observable where: 
// first word is the char to filter 
// second word are the msecs duration of the filter 

var filters = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0070.Ms(), "A 480"), 
    ReactiveTest.OnNext(0330.Ms(), "B 70") 
); 

var expected = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

は、あなたがこれを行うための最良のRx-ソリューションがある私をお勧めすることはできますか?

PS:私は

public static class TickExtensions 
{ 
    public static long Ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms).Ticks; 
    } 
} 
+0

コードはコンパイルされません。私は 'Ms()'が長く返るべきだと思います。 'filters'は' CreateColdObservable 'のように見えます。 –

+0

@LeeCampbell right、これを修正しました。 – zpul

答えて

1

まず、次の拡張メソッドを使用しています、あなたの入力は次のようになります。

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0200.Ms(), 'A'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0400.Ms(), 'B'), //You forgot this line 
    ReactiveTest.OnNext(0500.Ms(), 'A'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

また、@LeeCampbellが述べたように、あなたのMs拡張メソッドが型を返す必要がありますlongTimeSpanではありません。

var activeFilters = filters 
    .Select(s => s.Split(' ')) 
    .Select(s => Tuple.Create(s[0][0], TimeSpan.FromMilliseconds(int.Parse(s[1])))) 
    .Select(t => Observable.Timer(t.Item2, scheduler).Select(_ => t.Item1).StartWith(t.Item1)) 
    .MergeCombineLatest(true) 
    .StartWith(new List<char>()); 

var output = activeFilters.Publish(_activeFilters => 
     input.Join(_activeFilters, 
      _ => Observable.Return(1), 
      t => _activeFilters, 
      (c, filterList) => Tuple.Create(c, filterList) 
     ) 
    ) 
    .Where(t => !t.Item2.Contains(t.Item1)) 
    .Select(t => t.Item1); 

var observer = scheduler.CreateObserver<char>(); 
output.Subscribe(observer); 
scheduler.Start(); 

ReactiveAssert.AreElementsEqual(
    expected.Messages, 
    observer.Messages); 

activeFiltersフィルタの下に、現在の文字のリストを発する観測可能である:ここでは

は私が思いついた解決策です。積極的にフィルタリングされたリストが変更されると、activeFiltersが新しいListを発行します。同じ文字が複数のフィルタを持つことができるので、リストは必ずしも一意ではないことに注意してください。

アクティブになっているフィルタをいつでも特定できたら、そのリストを入力に追加できます。

コードがSystem.Collections.Immutable Nugetパッケージを使用して、この拡張メソッドを、必要とします。

public static IObservable<IList<T>> MergeCombineLatest<T>(this IObservable<IObservable<T>> outer, bool removeCompleted) 
{ 
    return outer 
     .SelectMany((inner, i) => inner 
      .Materialize() 
      .SelectMany(nt => nt.Kind == NotificationKind.OnNext 
       ? Observable.Return(Tuple.Create(i, nt.Value, true)) 
       : nt.Kind == NotificationKind.OnCompleted 
        ? removeCompleted 
         ? Observable.Return(Tuple.Create(i, default(T), false)) 
         : Observable.Empty<Tuple<int, T, bool>>() 
        : Observable.Throw<Tuple<int, T, bool>>(nt.Exception) 
      ) 
     ) 
     .Scan(ImmutableDictionary<int, T>.Empty, (dict, t) => t.Item3 ? dict.SetItem(t.Item1, t.Item2) : dict.Remove(t.Item1)) 
     .Select(dict => dict.Values.ToList()); 
} 

MergeCombineLatestは、観測の観測可能を取り、子観測のそれぞれからの最新の値のリストを発します。 removeCompletedが真である場合、子観測可能性が完了すると、リストは1つ縮小されます。 removeCompletedがfalseの場合、最後の値はリストに残り、連続して永遠にリストされます。

これをやっている面白い方法があれば、私は大いに義務づけられます。

+0

非常に便利な答えをありがとう!私はできるだけ早くそれをチェックします。 – zpul

+0

あなたのソリューションからインスパイアされ、私は次のようになりました。 https://gist.github.com/zpul/fcc82cb0e963ab69ebf7ae8fc7d95b2f あなたはどう思いますか? – zpul

+0

Rx(または任意の機能コード)の大きな点は、副作用/状態の問題やスレッドセーフの問題を回避できることです。 'List 'はスレッドセーフではありません。 https://msdn.microsoft.com/en-us/library/6sh2ey19.aspx#Anchor_10を参照してください。同時にフィルタを追加/削除しようとすると、 'excludeFilters'変数を誤って改ざんしている他の関数も問題になる可能性があります。 – Shlomo