2016-07-26 6 views
0

文字(A-Z)と数字(1-9)のストリームがあります。私は時間切れに到着する手紙に参加したい(これは変わることがあります)、すぐに数字をすぐに出します。どの機能がこれを行うのが最善であるとお考えですか?可変タイムアウトのバッファ選択メッセージ

サンプル動作するコード(わからないこれが正しいおよび/または良い解決策):

private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms()); 

private IObservable<string> lettersJoined(IObservable<char> ob) 
{ 
    return Observable.Create<string>(observer => 
    { 
     var letters = new List<char>(); 
     var lettersFlush = new SerialDisposable(); 

     return ob.Subscribe(c => 
     { 
      if (char.IsUpper(c)) 
      { 

       if ((await sTimeouts.FirstAsync()).Ticks > 0) 
       { 
        letters.Add(c); 

        lettersFlush.Disposable = 
         VariableTimeout(sTimeouts) 
         .Subscribe(x => { 
          observer.OnNext(String.Concat(letters)); 
          letters.Clear(); 
         }); 

       } 
       else 
        observer.OnNext(letters.ToString()); 


      } 
      else if (char.IsDigit(c)) 
       observer.OnNext(c.ToString()); 
     } 

    } 
} 


private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts) 
{ 
    return Observable.Create<long>(obs => 
    { 
     var sd = new SerialDisposable(); 
     var first = DateTime.Now; 

     return timeouts 
      .Subscribe(timeout => 
      { 
       if (timeout.Ticks == 0 || first + timeout < DateTime.Now) 
       { 
        sd.Disposable = null; 
        obs.OnNext(timeout.Ticks); 
        obs.OnCompleted(); 
       } 
       else 
       { 
        timeout -= DateTime.Now - first; 

        sd.Disposable = 
         Observable 
         .Timer(timeout) 
         .Subscribe(t => { 
          obs.OnNext(t); 
          obs.OnCompleted(); 
         }); 
       } 
      }); 

    }); 
} 

private void ChangeTimeout(int timeout) 
{ 
    sTimeouts.OnNext(timeout.ms()) 
} 


// I use the following extension method 
public static class TickExtensions 
{ 
    public static TimeSpan ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms); 
    } 
} 

がタイムアウトを変更するには、私は単にプライベートタイムアウト変数を変更することができますが、おそらくそれのための件名は次のようになります必要ならばOK、それ以上はOK。

UPDATE

var scheduler = new TestScheduler(); 

var timeout = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(0000.Ms(), 2000), 
    ReactiveTest.OnNext(4300.Ms(), 1000)); 

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(1600.Ms(), '2'), 
    ReactiveTest.OnNext(1900.Ms(), 'A'), 
    ReactiveTest.OnNext(2100.Ms(), 'B'), 
    ReactiveTest.OnNext(4500.Ms(), 'C'), 
    ReactiveTest.OnNext(5100.Ms(), 'A'), 
    ReactiveTest.OnNext(5500.Ms(), '5'), 
    ReactiveTest.OnNext(6000.Ms(), 'B'), 
    ReactiveTest.OnNext(7200.Ms(), '1'), 
    ReactiveTest.OnNext(7500.Ms(), 'B'), 
    ReactiveTest.OnNext(7700.Ms(), 'A'), 
    ReactiveTest.OnNext(8400.Ms(), 'A')); 

var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"), 
    ReactiveTest.OnNext(1600.Ms(), "2"), 
    ReactiveTest.OnNext(4100.Ms(), "AB"), 
    ReactiveTest.OnNext(5500.Ms(), "5"), 
    ReactiveTest.OnNext(7000.Ms(), "CAB"), 
    ReactiveTest.OnNext(7200.Ms(), "1"), 
    ReactiveTest.OnNext(9400.Ms(), "BAA")); 


// if ReactiveTest.OnNext(3800.Ms(), 1000) 
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB") 

UPDATE#2

正しくサンプル入力としてsampleInputを想定し

答えて

2

ここを助けるかもしれないいくつかの事を。

最初の大理石図は問題を視覚化するのに役立ちますが、何かがうまくいくかどうかを証明するときは、ITestableObservable<T>のインスタンスでは規範的で単位テストになります。

第2に、私はあなたの解決策がどうあるべきかわかりません。私があなたの大理石の図を見ると、私はいくつかの相違を見る。ここでは、視覚化を支援するタイムラインを追加しました。ここで

    111111111122222222223 
Time: 123456789
Input: 1---2--A-B----C--A-B-1--B-A--A 
Output: 1---2----AB-------CAB-1-----BAA 

私はその後、私はユニット19で公開「CAB」の出力を参照してくださいユニット10 で公開「AB」の出力を参照してください。 さらに、私はユニット29で公開された "BAA"出力を参照してください。 しかし、これらは一定のタイムアウト間隔で発生する必要があります。 それでは、値の間のギャップが重要であると思うかもしれませんが、これはどちらかと言えば結構です。これは上の私のところに戻ります。合格または不合格の単位テストを提供してください。

第3に、lettersFlushタイプのSerialDisposableタイプを使用すると、実装を少し改善することができます。

は私は私は私があなたの大理石の図の意味だと思うものにいくつかの値を変更するには、いくつかの自由を取った

var scheduler = new TestScheduler(); 
var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0500.Ms(), '2'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(1000.Ms(), 'B'), 
    ReactiveTest.OnNext(1500.Ms(), 'C'), 
    ReactiveTest.OnNext(1800.Ms(), 'A'), 
    ReactiveTest.OnNext(2000.Ms(), 'B'), 
    ReactiveTest.OnNext(2200.Ms(), '1'), 
    ReactiveTest.OnNext(2500.Ms(), 'B'), 
    ReactiveTest.OnNext(2700.Ms(), 'A'), 
    ReactiveTest.OnNext(3000.Ms(), 'A')); 

var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"), 
    ReactiveTest.OnNext(0500.Ms(), "2"), 
    ReactiveTest.OnNext(1000.Ms(), "AB"), 
    ReactiveTest.OnNext(2000.Ms(), "CAB"), 
    ReactiveTest.OnNext(2200.Ms(), "1"), 
    ReactiveTest.OnNext(3000.Ms(), "BAA")); 

次のコードブロックを作成するユニットテストを設定するのに役立ちます。

私が上記の@Shlomoで提供した非常に良い答えを使用すると、私はあいまいな大理石図を使用するだけで、さらなる問題を見ることができます。バッファ境界は、含まれる最後の値が発生した後に起こらなければならないので、これらのウィンドウは1つずつオフにする必要があります。

void Main() 
{ 
    var scheduler = new TestScheduler(); 
    var input = scheduler.CreateColdObservable<char>(
     ReactiveTest.OnNext(0100.Ms(), '1'), 
     ReactiveTest.OnNext(0500.Ms(), '2'), 
     ReactiveTest.OnNext(0800.Ms(), 'A'), 
     ReactiveTest.OnNext(1000.Ms(), 'B'), 
     ReactiveTest.OnNext(1500.Ms(), 'C'), 
     ReactiveTest.OnNext(1800.Ms(), 'A'), 
     ReactiveTest.OnNext(2000.Ms(), 'B'), 
     ReactiveTest.OnNext(2200.Ms(), '1'), 
     ReactiveTest.OnNext(2500.Ms(), 'B'), 
     ReactiveTest.OnNext(2700.Ms(), 'A'), 
     ReactiveTest.OnNext(3000.Ms(), 'A')); 

    var expected = scheduler.CreateColdObservable<string>(
     ReactiveTest.OnNext(0100.Ms(), "1"), 
     ReactiveTest.OnNext(0500.Ms(), "2"), 
     ReactiveTest.OnNext(1000.Ms()+1, "AB"), 
     ReactiveTest.OnNext(2000.Ms()+1, "CAB"), 
     ReactiveTest.OnNext(2200.Ms(), "1"), 
     ReactiveTest.OnNext(3000.Ms()+1, "BAA")); 

    /* 
        111111111122222222223 
    Time: 123456789
    Input: 1---2--A-B----C--A-B-1--B-A--A 
    Output: 1---2----AB-------CAB-1-----BAA 
    */ 

    var bufferBoundaries = //Observable.Timer(TimeSpan.FromSeconds(1), scheduler); 
      //Move to a hot test sequence to force the windows to close just after the values are produced 
      scheduler.CreateHotObservable<Unit>(
     ReactiveTest.OnNext(1000.Ms()+1, Unit.Default), 
     ReactiveTest.OnNext(2000.Ms()+1, Unit.Default), 
     ReactiveTest.OnNext(3000.Ms()+1, Unit.Default), 
     ReactiveTest.OnNext(4000.Ms()+1, Unit.Default)); 

    var publishedFinal = input 
     .Publish(i => i 
      .Where(c => char.IsLetter(c)) 
      .Buffer(bufferBoundaries) 
      .Where(l => l.Any()) 
      .Select(lc => new string(lc.ToArray())) 
      .Merge(i 
       .Where(c => char.IsNumber(c)) 
       .Select(c => c.ToString()) 
      ) 
     ); 

    var observer = scheduler.CreateObserver<string>(); 

    publishedFinal.Subscribe(observer); 
    scheduler.Start(); 

    //This test passes with the "+1" values hacked in. 
    ReactiveAssert.AreElementsEqual(
     expected.Messages, 
     observer.Messages); 

} 

// Define other methods and classes here 
public static class TickExtensions 
{ 
    public static long Ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms).Ticks; 
    } 
} 

私は、Rxが確定的であることを指摘しているので、確定的なテストを作成することができます。あなたの質問は非常に良いものですが、私は@Shlomoが確かな最終回答を提供していると信じています。私たちは例/テストでRandomを使用しています。 ここで正確にすることは、生産における馬鹿な競争状態を防ぎ、読者がこれらの解決策をよりよく理解するのに役立ちます。

+0

公正な批判。ありがとう。私は、Hand-Waving™の暗くて古代の芸術でよく教育されています。 – Shlomo

+0

ありがとうリー、あなたの答えに非常に有用なコメント。私はすぐにそれを分析し、その間に私は質問を強化しました。 – zpul

2

をバッファリング中にタイムアウトの変更をサポートする洗練されたソリューション:

var charStream = "12ABCAB1BAA".ToObservable(); 
var random = new Random(); 
var randomMilliTimings = Enumerable.Range(0, 12) 
    .Select(i => random.Next(2000)) 
    .ToList(); 

var sampleInput = charStream 
    .Zip(randomMilliTimings, (c, ts) => Tuple.Create(c, TimeSpan.FromMilliseconds(ts))) 
    .Select(t => Observable.Return(t.Item1).Delay(t.Item2)) 
    .Concat(); 

まず、代わりに変更可能な変数を変更するのではなく、代わりに、バッファ・ウィンドウを表すために、いくつかのストリームを生成するのがベストだろう:私はTimeSpan Sをインクリメントのストリームを生成し、実証するので、のようなbufferBoundariesそれを呼ば

Input: 1---2--A-B----C--A-B-1--B-A--A 
Window: ---------*--------*---------*-- 
Output: 1---2----AB-------CAB-1-----BAA 

var bufferBoundaries = Observable.Range(1, 20) 
    .Select(t => Observable.Return(t).Delay(TimeSpan.FromSeconds(t))) 
    .Concat(); 

これは、次のようになります。

Seconds: 0--1--2--3--4--5--6--7--8--9--10 
BB  : ---1-----2--------3-----------4- 

...次のあなたはsampleInputアップ文字と数字のために別々のストリームにすることを分割し、それに応じてそれらを処理したい:

var letters = sampleInput 
    .Where(c => char.IsLetter(c)) 
    .Buffer(bufferBoundaries) 
    .Where(l => l.Any()) 
    .Select(lc => new string(lc.ToArray())); 

var numbers = sampleInput 
    .Where(c => char.IsNumber(c)) 
    .Select(c => c.ToString()); 

次に、2つのストリームをマージ:

var finalOutput = letters.Merge(numbers); 

最後に、それはですあなたがそれを助けることができるならば、同じ入力(私たちの場合はsampleInput)に2回購読するのは良い考えではありません。だから、私たちのケースでは、我々は次のようにlettersnumbers、およびfinalOutputを交換する必要があります。

var publishedFinal = sampleInput 
    .Publish(_si => _si 
     .Where(c => char.IsLetter(c)) 
     .Buffer(bufferBoundaries) 
     .Where(l => l.Any()) 
     .Select(lc => new string(lc.ToArray())) 
     .Merge(_si 
      .Where(c => char.IsNumber(c)) 
      .Select(c => c.ToString()) 
     ) 
    ); 
+0

興味深い返答をいただきありがとうございます。実際のケース(より複雑な場合)に実際に適用できるかどうかを深く分析する必要があります。それまでの間、私は質問を強化しました。 – zpul

関連する問題