6

私は様々な方法で観測可能なものを作り、利害関係者に返信しますが、聞いていると、観測可能なものを分解して、リソースを消費し続けることができません。 pubサブシステムでトピックを作成することと考える方法もあります。誰もトピックにそれ以上購読していないときは、トピックとそれ以上のフィルタリングを保持したくありません。最後のオブザーバーが退会したときにイベントの公開を停止するRx observableを作成するにはどうしたらいいですか?

答えて

10

Rxは、すでにあなたのニーズに合わせてオペレータがある - も2実際に - Publish & RefCount

は、ここでそれらを使用する方法は次のとおりです。

IObservable xs = ... 

var rxs = xs.Publish().RefCount(); 

var sub1 = rxs.Subscribe(x => { }); 
var sub2 = rxs.Subscribe(x => { }); 

//later 
sub1.Dispose(); 

//later 
sub2.Dispose(); 

//The underlying subscription to `xs` is now disposed of. 

シンプル。

1

あなたの質問を理解している場合は、すべてのサブスクライバがサブスクリプションを廃棄した、つまりサブスクライバがなくなったときに、オブザーバブルが作成から停止するようなクリーンアップ機能を実行する。 これは、あなたが、あなたは以下のような何かを行うことができます欲しいものである場合:

//Wrap a disposable 
public class WrapDisposable : IDisposable 
    { 
     IDisposable disp; 
     Action act; 
     public WrapDisposable(IDisposable _disp, Action _act) 
     { 
      disp = _disp; 
      act = _act; 
     } 
     void IDisposable.Dispose() 
     { 
      act(); 
      disp.Dispose(); 
     } 
    } 

    //Observable that we want to clean up after all subs are done 
    public static IObservable<long> GenerateObs(out Action cleanup) 
    { 
     cleanup =() => 
     { 
      Console.WriteLine("All subscribers are done. Do clean up"); 
     }; 
     return Observable.Interval(TimeSpan.FromSeconds(1)); 
    } 
    //Wrap the observable 
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone) 
    { 
     int count = 0; 
     return Observable.CreateWithDisposable<T>(ob => 
     { 
      var disp = obs.Subscribe(ob); 
      Interlocked.Increment(ref count); 
      return new WrapDisposable(disp,() => 
      { 
       if (Interlocked.Decrement(ref count) == 0) 
       { 
        onAllDone();             
       } 
      }); 
     }); 
    } 

//使用例:

Action cleanup; 
var obs = GenerateObs(out cleanup); 
var newObs = WrapToClean(obs, cleanup); 
newObs.Take(6).Subscribe(Console.WriteLine); 
newObs.Take(5).Subscribe(Console.WriteLine); 
関連する問題