私がしたい:別のIObservableが公開されるまで、IObservableにサブスクライブしてデータをバッファリングする方法は?
- はすぐに
IObservable<T>
に加入しますが、すぐに(すなわち、まだ私のIObserver<T>
からは見えない)が受信される任意のT
をバッファリングを開始します。 - いくつか作業してください。
- 作業が完了すると、私の
IObserver<T>
にバッファをフラッシュし、
を続けることは、サブスクリプションが発生した最初のものであることは極めて重要です。 T + 1のそれに私自身がIObservable<int>
s1
に依存していることIObservable<bool>
r
に加入し、私はこのような何かの後だ「大理石の図」形式で
...
Time T+1 2 3 4 5 6 7 8
s1:IObservable<int> 1 2 3 4 5 6 7 8
s2:IObservable<bool> t
r: IObservable<int> 1 3 4 5 6 7 8
2
...とIObservable<bool>
s2
。 s1
は私がコントロールしていないストリームです。s2
は私がコントロールするものです(件名)。作業が終わったらpublish
がオンになります。
私はSkipUntil
が私を助けてくれると思っていましたが、従属する前に受け取ったイベントをバッファリングしません。IObservable
が完了しました。
私はうまくいくと思っていたコードですが、SkipUntil
はバッファではないためです。
var are = new AutoResetEvent(false);
var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));
events.Subscribe(x => Console.WriteLine("events:" + x),() => are.Set());
var subject = new Subject<int>();
var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Subscribing to events...");
events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
Console.WriteLine("Subscribed.");
completed.Subscribe(x => Console.WriteLine("Completed"));
subject.OnNext(10);
are.WaitOne();
Console.WriteLine("Done");
私は様々なBuffer
方法について知っているが、私は本当に私のサブスクリプションの開始時に活動を調整し、ここではバッファリングしていないですと、彼らは、この場合には、適切ないないようです。これは、私の作品
public static class ObservableEx
{
public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
{
var observable = Observable.Create<TSource>(o =>
{
var replaySubject = new ReplaySubject<TSource>();
var sub1 = source.Subscribe(replaySubject);
var query =
completed.Take(1).Select(
x => replaySubject.AsObservable());
var sub2 = query.Switch().Subscribe(o);
return new CompositeDisposable(sub1, sub2);
});
return observable;
}
}
ブリリアント、素晴らしい作品!今すぐそれを理解する;-) –