2012-05-31 1 views
7

私は株価の流れを表す観測値を持っています。私の観測可能なシーケンス上にオブザーバーがいない場合、私は価格のストリームを供給するリモートサーバーから切断できるようにしたいと思いますが、オブザーバーがDispose()を呼び出すまでそれをしたくありません。同様の方法で、最初の人がサブスクリプションを呼び出すと、リモートサーバーに再接続したいと思います。Observableのオブザーバ(数)を追跡しますか?

オブザーバブルに登録されているオブザーバの数を把握する方法はありますか?または、オブザーバーが購読またはディスポーザルに電話をしているときを知る方法でしょうか?

答えて

3

IObservable<T>は、実装可能なinterfaceです。インターフェイスのSubscribeメソッドでは、リストを内部的に管理することによってオブザーバを追跡することができます。

次のコードスニペットはMSDNのものです。

private List<IObserver<Location>> observers; 

public IDisposable Subscribe(IObserver<Location> observer) 
{ 
    if (! observers.Contains(observer)) 
     observers.Add(observer); 

    // ------- If observers.Count == 1 create connection. ------- 

    return new Unsubscriber(observers, observer); 
} 
private class Unsubscriber : IDisposable 
{ 
    private List<IObserver<Location>>_observers; 
    private IObserver<Location> _observer; 

    public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer) 
    { 
     this._observers = observers; 
     this._observer = observer; 
    } 

    public void Dispose() 
    { 
     if (_observer != null && _observers.Contains(_observer)) 
     _observers.Remove(_observer); 
     // ----------- if observers.Count == 0 close connection ----------- 
    } 
} 
+0

ええ、私はこれをどうしなければならないかと考えました。私は組み込みの科目のうちの1つを活用することができると期待していましたが、私はそのうちの1つ(ほとんどの場合BehaviorSubject)をラップする必要があるように見えますので、加入者を追跡できます。 –

+0

このソリューションはスレッドセーフではありません。それは生産に入る前に少しの作業が必要です。 – Enigmativity

9

私は単純にRefCount/Publishを使用します。私はIObservableを実装しているような気がします。

myColdObservable.Publish().RefCount(); 

これは、誰もが切断された後であなたの目に見える停止が脈動するようにします。ここではサンプルです:

var coldObservable = Observable 
    .Interval(TimeSpan.FromSeconds(1)) 
    .ObserveOn(Scheduler.TaskPool) 
    .Select(_ => DoSomething()); 

var refCountObs = coldObservable.Publish().RefCount(); 

CompositeDisposable d = new CompositeDisposable(); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n))); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n))); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n))); 

//Wait a bit for work to happen 
System.Threading.Thread.Sleep(10000); 

//Everyone unsubscribes 
d.Dispose(); 

//Observe that DoSomething is not called. 
System.Threading.Thread.Sleep(3000); 

これは、あなたが実際に加入者のを知りたいが、私は何の加入者が存在しない場合は、この作業を停止するあなたの要件に合うと思うケースをカバーしていません。

+0

この方法では、サブスクライバの数はわかりませんが、すべてのサブスクライバが終了した時点でソースを監視することはできません。あなた自身の観測値を実装するよりもはるかに優れています。 – Enigmativity

+0

これは最良の回答です –

3

一般に、IObservableは実装しないでください。通常、Rxには、直接的にも構図でも、あなたを助けることができるようなものがあります。 IObservableを実装する必要がある場合は、Observable.Createを使用してオブザーバー契約などに必要な保証をすべて取得します。

PublishとRefCountを使用することの提案は、あなたが探しています。なんらかの理由で自分自身を数えたい場合は、Observable.Deferを使用してObservableを使ってサブスクリプションを傍受します。最後に、シーケンス終端を傍受します。または、Observable.Createでソースをラップし、オブザーバをラップされたシーケンスに転送し、返されたIDisposableを(Disposable.Createを使用して)カウントロジックでラップします。

乾杯、

-Bart(Rxのチーム)古いものの

4

ビットが、私は、私は加入者数を知るために必要な問題を抱えていたように私はこの記事に出くわしました。 Bartの提案を使って、私はこの拡張を思いついた。

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged) 
{ 
int count = 0; 

return Observable.Defer(() => 
{ 
    count = Interlocked.Increment(ref count); 
    countChanged(count); 
    return source.Finally(() => 
    { 
     count = Interlocked.Decrement(ref count); 
     countChanged(count); 
    }); 
}); 
} 
関連する問題