2016-09-06 3 views
0

を与えられたソースの数が不明な参加しますは、以下のようなソースプロバイダ指定されたキー

IObservable<IEnumerable<string>> Results 

すべてのISourceから特定の文字列が返された場合。基本的に私はすべてのソースの交差点が必要です。

新しいソースが追加された場合は、すべてを再評価する必要があります。

私はこれに対する一般的な解決策を考え出すのに苦労しています。私が見たほとんどの解決策には、よく知られている情報源があります。 アイデアありがとうございます。

回答 もう一度考えてから、私は答えを思いついた。おそらくそれは上で改善することができますが、それは私のために働くようですので、誰か他の人が同じような問題を抱えている場合に参照するためにここに掲載します。返信する時間をとってくれたibebbsとShlomoに感謝します。 IntersectAllについては

//Arrange 
     var s1 = Substitute.For<ISource>(); 
     s1.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "a", "b", "c", "d" })); 

     var s2 = Substitute.For<ISource>(); 
     s2.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "b", "xx", "c", "d" })); 

     var s3 = Substitute.For<ISource>(); 
     s3.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "yy", "b", "ff", "d" })); 

     var expected = new[] { "b", "d" }; 

     var sources = new[] { s1, s2, s3 }.ToObservable(); 

     var scheduler = new TestScheduler(); 
     var observer = scheduler.CreateObserver<IList<string>>(); 

     //Act 
     sources.Buffer(TimeSpan.FromMilliseconds(500), scheduler) 
      .Select(s => Observable.CombineLatest(s.Select(x => x.ObserveData("NoFilter")))) 
      .Switch() 
      .Select(x =>IntersectAll(x)) 
      .Do(x => Console.WriteLine($"Recieved {string.Join("," , x)}")) 
      .Subscribe(observer); 

     scheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks); 

     //Assert 
     observer.Messages.AssertEqual(
      OnNext<IList<string>>(0, s => s.SequenceEqual(expected)), 
      OnCompleted<IList<string>>(0)); 

、これについてどのようにIntersection of multiple lists with IEnumerable.Intersect()

+0

結果はプロパティまたはメソッドですか?プロパティの場合、フィルタ引数として渡されるべきは何ですか? – Shlomo

答えて

1

[OK]を2回目の試みと私はかなりあなたが必要とするものと確信しています)底部に含ま:

public interface ISource 
{ 
    IObservable<IEnumerable<string>> ObserveData(string filter); 
} 

public static class ArbitrarySources 
{ 
    public static IObservable<IEnumerable<string>> Intersection(this IObservable<ISource> sourceObservable, string filter) 
    { 
     return sourceObservable 
      .SelectMany((source, index) => source.ObserveData(filter).Select(values => new { Index = index, Values = values })) 
      .Scan(ImmutableDictionary<int, IEnumerable<string>>.Empty, (agg, tuple) => agg.SetItem(tuple.Index, tuple.Values)) 
      .Select(dictionary => dictionary.Values.Aggregate(Enumerable.Empty<string>(), (agg, values) => agg.Any() ? agg.Intersect(values) : values).ToArray());  
    } 
} 

public class IntersectionTest 
{ 
    internal class Source : ISource 
    { 
     private readonly IObservable<IEnumerable<string>> _observable; 

     public Source(IObservable<IEnumerable<string>> observable) 
     { 
      _observable = observable; 
     } 

     public IObservable<IEnumerable<string>> ObserveData(string filter) 
     { 
      return _observable; 
     } 
    } 

    [Fact] 
    public void ShouldIntersectValues() 
    { 
     TestScheduler scheduler = new TestScheduler(); 

     var sourceA = new Source(scheduler.CreateColdObservable(
      new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b" })), 
      new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b", "c" })) 
     )); 

     var sourceB = new Source(scheduler.CreateColdObservable(
      new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "c" })), 
      new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "b", "c" })) 
     )); 

     var sources = scheduler.CreateColdObservable(
      new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<ISource>(sourceA)), 
      new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext<ISource>(sourceB)) 
     ); 

     var observer = scheduler.Start(() => sources.Intersection("test"), 0, 0, TimeSpan.FromSeconds(6).Ticks); 

     IEnumerable<string>[] actual = observer.Messages 
      .Select(message => message.Value) 
      .Where(notification => notification.Kind == NotificationKind.OnNext && notification.HasValue) 
      .Select(notification => notification.Value) 
      .ToArray(); 

     IEnumerable<string>[] expected = new [] 
     { 
      new [] { "a", "b" }, 
      new [] { "a" }, 
      new [] { "a", "c" }, 
      new [] { "b", "c" } 
     }; 

     Assert.Equal(expected.Length, actual.Length); 

     foreach (var tuple in expected.Zip(actual, (e, a) => new { Expected = e, Actual = a })) 
     { 
      Assert.Equal(tuple.Expected, tuple.Actual); 
     } 
    } 
} 

このアプローチは、新しいソースが追加され、再照会しない既存のソースの追加の利点を有するが、交差点に任意のソース値を発するたびに再計算されます。

+0

はい、うまく機能します。ありがとう – user630190

0

を参照してください。

public IObservable<IEnumerable<string>> From(this IObservable<ISource> sources, string filter) 
{ 
    return sources 
     .Scan(Observable.Empty<IEnumerable<string>>(), (agg, source) => Observable.Merge(agg, source.ObserveData(filter))) 
     .Switch(); 
} 

新しいソースが以前に放出されたsourcesすべてのソースから放出されるたびに持っていることを、注意してください再度ObserveDataメソッドが呼び出されました。したがって、この解決策は特にうまく調整されませんが、新しいソースを追加するとすべてが再評価される必要があります。要件

+0

一見すると、交差点を見つけるのではなく、すべてのソースをマージするように見えます。だからソース1が["a"、 "b"、 "c"]とソース2つの出力["b"、 "a"]を出力すれば、私は["a"、 "b"] – user630190

関連する問題