2016-04-14 4 views
0

RXを使用して多数のパブリッシャから通知を送信するシステムをモデル化しようとしています。RXで複数のカスタムオブザーバをマージする

ITopicObservableとITopicObserverの2つのカスタムインターフェイスを使用して、IObservableインターフェイスとIObserverインターフェイスを除いて、実装クラスが他のプロパティとメソッドを持つことをモデル化しました。

問題は私が考えているのは、複数のオブザーバブルを一緒にマージし、オブザーバに登録して、すべてのマージされたオブザーバからの更新を提供できるということです。しかし、 "issue"コメントのコードでは、無効なキャスト例外がスローされます。

ユースケースは、ボックス内の温度をそれぞれ監視する複数の独立したセンサーです。たとえば、すべてのレポートを1つの温度レポートに集約し、その後温度ヘルスモニタによってサブスクライブします。

私はここで何が欠けていますか?または、RXを使用してシナリオを実装するより良い方法はありますか?

using System; 
using System.Reactive.Linq; 
using System.Collections.Generic; 

namespace test 
{ 
class MainClass 
{ 
    public static void Main (string[] args) 
    { 
     Console.WriteLine ("Hello World!"); 
     var to = new TopicObserver(); 
     var s = new TopicObservable ("test"); 

     var agg = new AggregatedTopicObservable(); 
     agg.Add (s); 

     agg.Subscribe (to); 
    } 
} 

public interface ITopicObservable<TType>:IObservable<TType> 
{ 
    string Name{get;} 
} 

public class TopicObservable:ITopicObservable<int> 
{ 
    public TopicObservable(string name) 
    { 
     Name = name; 
    } 
    #region IObservable implementation 
    public IDisposable Subscribe (IObserver<int> observer) 
    { 
     return null; 
    } 
    #endregion 
    #region ITopicObservable implementation 
    public string Name { get;private set;} 

    #endregion 
} 

public class AggregatedTopicObservable:ITopicObservable<int> 
{ 
    List<TopicObservable> _topics; 
    private ITopicObservable<int> _observable; 
    private IDisposable _disposable; 

    public AggregatedTopicObservable() 
    { 
     _topics = new List<TopicObservable>(); 
    } 

    public void Add(ITopicObservable<int> observable) 
    { 
     _topics.Add ((TopicObservable)observable); 
    } 

    #region IObservable implementation 
    public IDisposable Subscribe (IObserver<int> observer) 
    { 
     _observable = (ITopicObservable<int>)_topics.Merge(); 

     _disposable = _observable.Subscribe(observer); 

     return _disposable; 
    } 
    #endregion 
    #region ITopicObservable implementation 
    public string Name { get;private set;} 
    #endregion 

} 



public interface ITopicObserver<TType>:IObserver<TType> 
{ 
    string Name{get;} 
} 

public class TopicObserver:ITopicObserver<int> 
{ 
    #region IObserver implementation 
    public void OnNext (int value) 
    { 
     Console.WriteLine ("next {0}", value); 
    } 
    public void OnError (Exception error) 
    { 
     Console.WriteLine ("error {0}", error.Message); 
    } 
    public void OnCompleted() 
    { 
     Console.WriteLine ("finished"); 
    } 
    #endregion 
    #region ITopicObserver implementation 
    public string Name { get;private set;} 
    #endregion 

} 
} 

答えて

1

使用している.Merge(...)オペレータの署名があるがあります:

IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources) 

この.Merge()によって返された実際の型は次のとおりです。

System.Reactive.Linq.ObservableImpl.Merge`1[System.Int32] 

... (ITopicObservable<int>)_topics.Merge();を呼び出すと失敗することは明らかです。

IObservable<>またはIObserver<>のいずれも実装していないLeeのアドバイスが正しいです。上記のようなエラーが発生します。

public interface ITopic 
{ 
    string Name { get; } 
} 

public interface ITopicObservable<TType> : ITopic, IObservable<TType> 
{ } 

public interface ITopicSubject<TType> : ISubject<TType>, ITopicObservable<TType> 
{ } 

public interface ITopicObserver<TType> : ITopic, IObserver<TType> 
{ } 

public class Topic 
{ 
    public string Name { get; private set; } 

    public Topic(string name) 
    { 
     this.Name = name; 
    } 
} 

public class TopicSubject : Topic, ITopicSubject<int> 
{ 
    private Subject<int> _subject = new Subject<int>(); 

    public TopicSubject(string name) 
     : base(name) 
    { } 

    public IDisposable Subscribe(IObserver<int> observer) 
    { 
     return _subject.Subscribe(observer); 
    } 

    public void OnNext(int value) 
    { 
     _subject.OnNext(value); 
    } 

    public void OnError(Exception error) 
    { 
     _subject.OnError(error); 
    } 

    public void OnCompleted() 
    { 
     _subject.OnCompleted(); 
    } 
} 

public class AggregatedTopicObservable : Topic, ITopicObservable<int> 
{ 
    List<ITopicObservable<int>> _topics = new List<ITopicObservable<int>>(); 

    public AggregatedTopicObservable(string name) 
     : base(name) 
    { } 

    public void Add(ITopicObservable<int> observable) 
    { 
     _topics.Add(observable); 
    } 

    public IDisposable Subscribe(IObserver<int> observer) 
    { 
     return _topics.Merge().Subscribe(observer); 
    } 
} 

public class TopicObserver : Topic, ITopicObserver<int> 
{ 
    private IObserver<int> _observer; 

    public TopicObserver(string name) 
     : base(name) 
    { 
     _observer = 
      Observer 
       .Create<int>(
        value => Console.WriteLine("next {0}", value), 
        error => Console.WriteLine("error {0}", error.Message), 
        () => Console.WriteLine("finished")); 
    } 

    public void OnNext(int value) 
    { 
     _observer.OnNext(value); 
    } 
    public void OnError(Exception error) 
    { 
     _observer.OnError(error); 
    } 
    public void OnCompleted() 
    { 
     _observer.OnCompleted(); 
    } 
} 

し、それを実行します:あなたはこのような何かをしなければならなかった場合、私はそれをこのように行うだろう

var to = new TopicObserver("watching"); 
var ts1 = new TopicSubject("topic 1"); 
var ts2 = new TopicSubject("topic 2"); 

var agg = new AggregatedTopicObservable("agg"); 

agg.Add(ts1); 
agg.Add(ts2); 

agg.Subscribe(to); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

与える:

 
next 42 
next 1 
finished 

しかし、私はそれがどのように役立つか分からない名前をすべて与えることができることを離れて、あなたはいつもこれを行うことができます:

var to = 
    Observer 
     .Create<int>(
      value => Console.WriteLine("next {0}", value), 
      error => Console.WriteLine("error {0}", error.Message), 
      () => Console.WriteLine("finished")); 

var ts1 = new Subject<int>(); 
var ts2 = new Subject<int>(); 

var agg = new [] { ts1, ts2 }.Merge(); 

agg.Subscribe(to); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

インターフェイスとクラスを使用しない場合と同じ出力です。

さらに興味深い方法があります。これを試してみてください:

var to = 
    Observer 
     .Create<int>(
      value => Console.WriteLine("next {0}", value), 
      error => Console.WriteLine("error {0}", error.Message), 
      () => Console.WriteLine("finished")); 

var agg = new Subject<IObservable<int>>(); 

agg.Merge().Subscribe(to); 

var ts1 = new Subject<int>(); 
var ts2 = new Subject<int>(); 

agg.OnNext(ts1); 
agg.OnNext(ts2); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

var ts3 = new Subject<int>(); 

agg.OnNext(ts3); 

ts3.OnNext(99); 
ts3.OnCompleted(); 

これが生成します。

 
next 42 
next 1 
next 99 

をそれはあなたがマージした後、新しいソース観測を追加することができます!

+0

ありがとうございました!インターフェースの使用に関する質問に答えるために、たとえば、システムのさまざまな部分を監視する多数の温度プローブがあります。これらのプローブのそれぞれは固有のID /名前を持っています。温度をファイルに独立して記録します。アグリゲーショントピックスが入っているシステム全体を効果的に監視する集約温度モニタがあります。アグリゲーショントピックスの情報に基づいて、オン/オフを切り替える意思決定を行う別のエンティティ(オブザーバ)もあります。このすべてはログに記録されます。これを達成するためのよりクリーンな方法がありますか? – Bernard

+0

@Bernard - はい、もっと簡単な方法があります。インターフェイスを拡張しないでください。代わりに、生成されたID、名前、および値を含むカスタムオブジェクトを作成する必要があります。次に、このカスタムオブジェクトを返すオブザーバブルを作成します。これは非常に簡単にマージできます。あなたは本当にこれを別の質問として尋ねるべきです。私はあなたに答えを与えることを嬉しく思っています。もしあなたがすれば、温度プローブが値を出す時を知る方法を詳しく教えてください(コードも示しています)。 – Enigmativity

+0

私は新しい質問をhttp://stackoverflow.com/questions/36723106/building-a-sensor-monitoring-system-using-rxとして作成しました。助けてくれてありがとう! – Bernard

2

私が最初に考え、以下の

コードは、あなたがIObservable<T>を実装してはならないということです、あなたは、プロパティやメソッドの結果として、それを暴露することによって、それを構成する必要があります。

2番目の考えは、複数のシーケンスをマージ/集約することに優れたRxの演算子があることです。 あなたはそれらを使うのが好きでしょう。最初に似て

第三に、あなたは一般的にIObserver<T>を実装していない、あなただけの観察可能なシーケンスを購読すると(OnNextOnErrorOnComplete)をコールバックごとにデリゲートを提供

だからあなたのコードは基本的TopicListener(string)IObservable<T>を返すだけの方法である

Console.WriteLine("Hello World!"); 
var topic1 = TopicListener("test1"); 
var topic2 = TopicListener("test2"); 

topic1.Merge(topic2) 
    .Subscribe(
    val => { Console.WriteLine("One of the topics published this value {0}", val);}, 
    ex => { Console.WriteLine("One of the topics errored. Now the whole sequence is dead {0}", ex);}, 
    () => {Console.WriteLine("All topics have completed.");}); 

に縮小されています。 TopicListener(string)メソッドの実装では、おそらくObservable.Createが使用されます。

TopicベースのメッセージングシステムでRxをマッピングする例がわかります。 あなたはTIBRVのトピック上のRxレイヤことができる方法の例ここではhttps://github.com/LeeCampbell/RxCookbook/blob/master/IO/Comms/TibRvSample.linq

+0

ありがとうございます!私は明確な答えを感謝しますが、私は@エニグマティビティはもう少し説明的だったと思います。 – Bernard

+0

はい、彼の答えには素晴らしい詳細があります。しかし、それらの '科目'に疲れて! –

+0

Campell、何が欠点ですか? – Bernard

関連する問題