2017-02-08 9 views
0

私は観測対象からゆっくりとライブストリーミングされたコードをいくつか持っています。私は、永続化されたオブジェクトからのライブストリームとそのタイムスタンプを別の観測でシミュレートしたいと思います。タイムスタンプ付きの配列から1つのスケジュールObservableを構築する方法

私は配列上でObservable.Return.Delayを使って何かを作ってから、単一の観測値にマージすることができました。私はこれが正しいアプローチではないと感じ、スレッドプールがいっぱいになるとタイミングが失敗する可能性があるため、すべてのアイテムに必要なスレッドがある可能性があります。

var data = Enumerable.Range(1, 10); 

var observable = data 
    .Select((x, idx) => Observable.Return(x).Delay(DateTimeOffset.Now.AddSeconds(idx*3))) 
    .Merge(); 

静的データをスケジュールする最良の方法は何ですか?

+0

テスト目的ですか? – Shlomo

+0

UATでプロダクション環境を再生するためのものです。 – KrisG

+0

'Merge'の代わりに' .Concat'のようなものを使用し、スケジューラにあふれさせる 'data'の非常に大きなセットを使用することができます。ただし、これは相対的なタイムスタンプがある場合にのみ有効です。あなたは、単に私が想定している生成された 'idx'を実際に使っているものではないと考えています(代わりにログやイベントストリームから読む) –

答えて

0

ロットと私は最善の解決策である全くわかりません。

私はそれがこのために設計されたユーティリティメソッドを使用してのように感じているので、可能性が低いソリューションを探してかなりきれいのカップル...

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver) 
{ 
    return items.Select(x => Observable.Return(x).DelaySubscription(delayResolver(x))).Concat(); 
} 

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver) 
{ 
    return items.Select(x => Observable.Empty<TSource>().Delay(delayResolver(x)).Concat(Observable.Return(x))).Concat(); 
} 

おそらく最良の解決策を見つけた

副作用に苦しむのは、Daniel C. WeberのGenerateメソッドです。referred。私が見つけたまで私はそれをかなり把握していませんでしたが、より詳細な答えはthisです。

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver) 
{ 
    return Observable.Generate(
     items.GetEnumerator(), 
     x => x.MoveNext(), 
     x => x, 
     x => x.Current, 
     x => delayResolver(x.Current)); 
} 
0

最も簡単な方法はObservable.Intervalメソッドを使用すると思います。これをテストに使用して再利用する必要がある場合は、結果の遅延を処理するメソッドを作成できます。 Timestamped

static IObservable<TSource> DelayObservable<TSource>(IEnumerable<TSource> source, TimeSpan delay) 
{ 
    var observableSource = source.ToObservable(); 

    // Use start with to fire one off right away 
    var interval = Observable.Interval(delay).StartWith(0); 

    // Zip so when the enumerable is done interval is auto unsubscribed 
    return interval.Zip(observableSource, (d, S) => S); 
} 

は、その後、使用することができますRxのパッケージの一部として便利なクラスがあります

var observable = DelayObservable(Enumerable.Range(1, 10), TimeSpan.FromSeconds(3)) 
+0

私は配列の各項目に関連する異なる時刻を持っており、配列内の項目ごとに特定の遅延が必要です。申し訳ありませんが、私のサンプルコードはこれを明確にしませんでした。私は2つの観測値を圧縮する考えが好きです。 – KrisG

+0

FYI 'var interval = Observable.Interval(遅延).StartWith(0);' var interval = Observable.Timer(TimeSpan.Zero、delay); ' –

+0

@KrisG私はあなたのコードを実行しました。それぞれの間に3秒。あなたは3秒間、次に6秒間、そして各出力間で9秒間の遅延を探していますか? – CharlesNRice

0

のようにそれを呼び出すことができます。是非、あなたはする必要はありません。ここでは例です:

var timestampedObjects = Enumerable.Range(0, 5) 
    .Select(i => Timestamped.Create(i, DateTimeOffset.Now + TimeSpan.FromSeconds(i*i))) 
    .ToList(); 

var observable = timestampedObjects.ToObservable() 
    .SelectMany(t => Observable.Timer(t.Timestamp).Take(1).Select(_ => t.Value)); 

observable.Subscribe(i => Console.WriteLine($"{DateTimeOffset.Now}: {i}")); 

これはTimerためDelayを下塗りを除き、あなたのソリューションとほぼ同じです。 Delayは完全に観測可能であるがタイムシフトのすべてのために要素間の時間差を保存することを意図している。 Timerは、単一のアイテムを絶対的にスケジューリングするためのものです。

Selectと、それに続くMergeは、SelectManyに相当します。

あなたのソリューションはうまくいくはずです。スレッドの問題については心配しませんが、Rxはスレッドリソースのスケジューリングと結合にかなり効率的です。

もう1つの方法は、パッケージMicrosoft.Reactive.Testingです。このパッケージを使用すると、仮想の時間、時間、または年単位で予定を立て、瞬時に実行できます。単体テストには最適ですが、何か他の点ではやや面倒です。

1

Generateのオーバーロードがあり、アイテムを放出するタイミングは正確にDateTimeOffsetです。 Take a look

署名は複雑に見えますが、かなり簡単です。 timestamp-functionは、前の項目が発行されたときにのみ評価されるため、使用するタイムスタンプが昇順であることを確認する必要があります。

ここからタイムスタンプを取得していますか?あなたは別の観測を述べました。あなたの例を強化することができたら、私はいくつかの作業コードを生成しようとする可能性があります。

0

このようにPlay演算子を使用して、データでフィードすることができます(将来的にはタイムシフトされたものもあります)。私はNotification<T>を使用したので、完成とエラーのテストもサポートします。

public static IObservable<T> Play<T>(this IEnumerable<Timestamped<Notification<T>>> source, IScheduler scheduler = null) 
{ 
    if (scheduler == null) scheduler = Scheduler.Default; 

    return Observable.Create<T>(observer => new CompositeDisposable(source 
     .Where(tn => tn.Timestamp > scheduler.Now) 
     .Select(tn => scheduler.Schedule(tn.Timestamp,() => tn.Value.Accept(observer))))); 
} 

テスト:このいずれかに異なった答えの

var events = Enumerable.Range(0, 10) 
    .Select(i => Timestamped.Create(i, DateTimeOffset.Now.AddSeconds(3 * i))) 
    .ToList(); 
var sch = new TestScheduler(); 
var testee = events 
    .Select(e => Timestamped.Create(Notification.CreateOnNext(e.Value), e.Timestamp)) 
    .Play(sch); 
var result = new List<Timestamped<int>>(); 
using (testee.Timestamp(sch).Subscribe(i => result.Add(i))) 
    sch.Start(); 
Assert.True(result.SequenceEqual(events)); 
関連する問題