2016-05-17 14 views
1

Rxで任意の数のストリームを「圧縮」しようとしていますが、要素は対応していますが、順不同で処理される可能性があります。各ストリームの要素には、それらを一致させるために使用できる識別子があります。例えば。要素は次のようになります。私たちは、同じキーを共有する要素を一致させたい場合はRxはプロパティで結合することによって多くのストリームを結合します

|-A-----------A 
|--B---------B- 
|-----C------C- 
|-----ABC-----ABC <- zip 

しかし、どのような:

public class Element 
{ 
    public string Key {get; set;} 
} 

通常、ジッパーはちょうどその発生インデックスで要素を組み合わせたのだろうか?私はもっ​​とこのように動作しますシーケンスを探しています:

|--2A-------1A---------- 
|----1B----------2B----- 
|------1C-----------2C-- 
|-----------1ABC----2ABC <- zipped by key 1 & 2 respectively 

(この例では、キーは1または2である)私はGroupJoinは、このシナリオに合っていることを感じるが、それは2つしか観測を提供していますそれらを連鎖させることは、かなり早く手を伸ばしてしまった。

私はAnd/Then/Whenも見ましたが、このシナリオのための構造をどのように構成するのか本当に分かりませんでした。

理想的には、結果セレクタの入力が同じキーを持つことが保証されている、結果セレクタを呼び出して提供できる拡張メソッドが必要です。

どのようにこの問題にアプローチしますか?

答えて

0

私はLinqPadで一緒にぶつかりました。それはあなたの大理石図の要件を満たしています。しかし、それは私が望むよりも面倒です。 Rx-Testing

void Main() 
{ 
    TestScheduler scheduler = new TestScheduler(); 
    /* 
|--2A-------1A---------- 
|----1B----------2B----- 
|------1C-----------2C-- 
|-----------1ABC----2ABC <- zipped by key 1 & 2 respectively 

    */ 
    var sourceA = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(3, "2A"), 
     ReactiveTest.OnNext(12, "1A")); 
    var sourceB = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(5, "1B"), 
     ReactiveTest.OnNext(17, "2B")); 
    var sourceC= scheduler.CreateColdObservable(
     ReactiveTest.OnNext(7, "1C"), 
     ReactiveTest.OnNext(20, "2C")); 

    var observer = scheduler.CreateObserver<string>(); 


    var query = Observable.Merge(sourceA, sourceB, sourceC) 
     .GroupBy(x => GetKey(x)) 
     .SelectMany(grp => grp.Select(x=>GetValue(x)) 
           .Take(3) 
           .Aggregate(new List<string>(), 
             (accumulator, current) => { 
              accumulator.Add(current); 
              return accumulator; 
             }) 
          .Select(acc=>CreateGroupResult(grp.Key, acc))); 

    query.Subscribe(observer); 
    scheduler.Start(); 

    ReactiveAssert.AreElementsEqual(
     new[]{ 
      ReactiveTest.OnNext(12, "1ABC"), 
      ReactiveTest.OnNext(20, "2ABC") 
     }, 
     observer.Messages 
    ); 

} 

// Define other methods and classes here 
private static string CreateGroupResult(string key, IEnumerable<string> values) 
{ 
    var combinedOrderedValues = string.Join(string.Empty, values.OrderBy(v => v)); 
    return string.Format("{0}{1}", key, combinedOrderedValues); 
} 

private static string GetKey(string message) 
{ 
    return message.Substring(0, 1); 
} 

private static string GetValue(string message) 
{ 
    return message.Substring(1); 
} 

Nugetの依存関係

関連する問題