2016-11-25 6 views
1

私はイベントを送出し、観察ストリーム持っている:私は最後のNでイベントがなかった場合のイベント「を開催してください」N秒ごとを追加する別の一つに、このストリームをラップしたいReactiveで「繰り返しタイムアウト」をモデル化する方法は?

---*--*--*---*------------------------*---*---***--**----- 

を秒:

望ましい結果:(Xさんが新たに注入されたイベント

---*--*--*---*---X---X---X---X---X---X*---*---***--**---X- 

私は慣用的な反応方法でそれをモデル化しようとしているが、私は簡単にこれを行うようになる演算子を発見していません。タイムアウトは来ると思われるものですそれが繰り返されないことを除いて、実際にストリームを終了します。

答えて

0

これはわずかに「ハッキングされた」解決策です。コードが何をしているのかは明らかではありません。

public static IObservable<T> RepeatingTimeout<T>(ITestableObservable<T> events, TimeSpan timespan, T timeoutEvent, TestScheduler scheduler) 
{ 
    var groups = events 
     .GroupByUntil(x => 1, g => g, g => g.Throttle(timespan, scheduler)) // group events into observables so that each observable contains sequence of events where each event is sooner than timespan 
     .Select(x => 
      x.Concat(Observable.Interval(timespan, scheduler).Select(_ => timeoutEvent) // concatenate timeout events at intervals after each group of events 
      .StartWith(timeoutEvent)) 
      ) 
     .Switch(); // return events from latest observable 
    return groups; 
} 

そして、これは、それが動作することを示しているテストです:

[Test] 
public void RepeatedTimeout() 
{ 
    var scheduler = new TestScheduler(); 

    var events = scheduler.CreateHotObservable(
     OnNext(4, 1), 
     OnNext(7, 2), 
     OnNext(10, 3), 
     OnNext(14, 4), 
     OnNext(30, 5), 
     OnNext(35, 6), 
     OnCompleted<int>(36) 
     ); 

    var timespan = TimeSpan.FromTicks(5); 
    var timeoutEvent = -1; 

    var groups = RepeatingTimeout(events, timespan, timeoutEvent, scheduler); 

    groups.Do(x => Console.WriteLine("G " + scheduler.Clock + " " + x)); 


    var results = scheduler.Start(() => groups, 0, 0, 100); 
} 
+0

に感謝助けて。このテストでは私のために何も印刷されませんか? –

0

誰後でこの問題に遭遇した場合、これは私がなってしまったソリューションです:

public static class ObservableExtensions 
{ 
    public static IObservable<T> PulseDuringInactivity<T>(this IObservable<T> source, TimeSpan pulseInterval, IScheduler scheduler = null) 
    { 
     return source 
      //for each incoming event, we create a new observable stream, that contains this event being repeated infinitely many times over at our pulse interval 
      //these repeating values will serve as the pulse 
      .Select(sourceValue => RepeatingObservable(sourceValue, pulseInterval, scheduler)) 

      //However obviously that creates way too many events. What we we'll do is throw away all of these repeating observables, as soon as there's a single event coming in on any of our streams. 
      //this is what .Switch() does, and it immediately unsubscribes from the now-irrelevant streams, which can then be gc'd. For more information about the workings of this technique, 
      //check out the tests in ObservableExtensionTests.cs 
      .Switch(); 
    } 

    static IObservable<T> RepeatingObservable<T>(T valueToRepeat, TimeSpan repeatInterval, IScheduler scheduler) 
    { 
     return Observable.Interval(repeatInterval, scheduler ?? Scheduler.Default).Select(_ => valueToRepeat); 
    } 
} 
関連する問題