2011-12-13 8 views
3

私は、観測可能な複数のイベントを、Rxを使って1組のイベントにどのようにストリーミングできるかを調べようとしています。しかし、以下のコードを実行すると例外が発生します。これは、Rx文法違反のために複数のオブザーバが常に例外になりやすいということですか?つまり、複数の観測者のうちの2人が同時に偶然にイベントを生成した場合(いずれの2つの観測も同時に発生する可能性があります)、例外が発生します。観測者はRxで複数の観測値を安全に聞くことができますか?

DateTimeOffset start; 
     object sync = new object(); 
     var subject = new Subject<long>(); 
     var observer = Observer.Create<long>(c => 
     { 
      lock (sync) 
      { 
       Console.WriteLine(c); 
      } 
     }) 
      ; 

     var observable1 = Observable.Interval(TimeSpan.FromSeconds(2)); 
     var observable2 = Observable.Interval(TimeSpan.FromSeconds(5)); 
     var observable3 = Observable.Never<long>().Timeout 
      (start = DateTimeOffset.Now.AddSeconds(15), 
      (new long[] { 1 }).ToObservable()); 
     var observable4 = Observable.Never<long>().Timeout(start); 
     observable1.Subscribe(observer); 
     observable2.Subscribe(observer); 
     observable3.Subscribe(observer); 
     observable4.Subscribe(observer); 
     Thread.Sleep(20000); 

ギデオンの説明に感謝します。これは私が得ている例外です。あなたは時間切れの例外であるということは間違いありません。これはコーディングミスでした。ありがとう。

System.TimeoutException: The operation has timed out. 
    at System.Reactive.Observer.<Create>b__8[T](Exception e) 
    at System.Reactive.AnonymousObserver`1.Error(Exception exception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.Subjects.Subject`1.OnError(Exception error) 
    at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e 
xception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e 
xception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28 
e.<Throw>b__28b() 
    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta 
te, Func`3 action) 
    at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio 
n action) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse 
rver`1 observer) 
    at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0() 

    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore() 
    at System.Reactive.Concurrency.ScheduledItem`1.Invoke() 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run() 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState 
state, TimeSpan dueTime, Func`3 action) 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState 
state, Func`3 action) 
    at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio 
n action) 
    at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54 
5.<Timeout>b__53f() 
    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche 
dule>b__6(Object _) 
    at System.Threading._TimerCallback.TimerCallback_Context(Object state) 
    at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C 
ontextCallback callback, Object state, Boolean ignoreSyncCtx) 
    at System.Threading._TimerCallback.PerformTimerCallback(Object state) 

答えて

5

はい、オブザーバは複数のオブザーバを聴くことができます。これの最も良い例は、Merge演算子です。ビルトイン演算子はすべてRX文法に従い、そうでないソースに対しては頻繁に実行します。

IObserverから取得する場合はObserver.Createです。 OnErrorまたはOnCompletedが呼び出されると、それ以降のOnNextの呼び出しは無視されます。これは、同じオブザーバを使用して1つのオブザーバブルにサブスクライブし、その後に別のオブザーバブルを使用すると、最初のオブザーバブルからの終了メッセージがオブザーバに2番目のオブザーバブルからのメッセージを無視させるため、動作しないことを意味します。これを回避するために、Merge,Concat、およびOnErrorResumeNextなどの演算子は、複数のオブザーバを内部的に使用し、最後に観測可能なものから外側に完了メッセージ(オペレータのセマンティクスに応じてOnErrorおよび/またはOnCompleted)を渡しません観察者。

あなたは何の例外が発生しているかは言及していませんでしたが、私はそれがobservable4から取得しているタイムアウトのエラーであると推測します。タイムアウトに使用する別の観測値を指定しない場合は、オブザーバのOnErrorが呼び出され、SubscribeObserver.CreateのデフォルトのOnErrorはエラーハンドラを使用しないオーバーロードで例外をスローするだけです。

これは明らかにサンプルコード/テストコードですが、メッセージがもはやOnNextに渡されなくても、他のすべての観測値はこの例外の後も実行され続けていることを指摘したいと思います。 Mergeを使用してこれを追跡するか、説明からすべてのディスポーザブルを追跡し、完了メッセージが届いたら自分で処分してください。 CompositeDisposableSystem.Reactive.Disposables)がこれに適しています。

2

あなたが本当にここにロックを使用すべきではありませんが、あなたは本当にこれが仕事をしたい場合は、あなたが行うことができます:

var x = Observable.Create<T>(subj => { /* Fill it in*/ }) 
    .Multicast(new Subject<T>()); 

// Set up your subscriptions Here! 

// When you call the Connect, whatever is in the Observable.Create will be called 
x.Connect(); 

あなたも、より安全になりたいと思った場合、あなたはそれをするように作ることができますSubjectの代わりにReplaySubjectを使用することで、Createの結果が今後のサブスクリプションのために再生されます(のサブジェクトでは、の後にConnectには何も取得されません)

関連する問題