2016-06-27 12 views
1

私は、同じソースへのサブスクリプションを持つ2つの単純な観測ハンドラを持っています。ただし、両方のサブスクリプションは異なるタイプで動作します。私は彼らに観察可能な情報源(Subject())の順序を保持してもらいたい。私はSynchronize()拡張で試しましたが、期待通りにこの作業を行う方法が見つかりませんでした。テストコードのオブザーバブルとオフロードUIの同期方法スレッド

[Test] 
public void TestObserveOn() 
{ 
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
    var source = new Subject<object>(); 
    var are = new AutoResetEvent(false); 

    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
     o => 
      { 
       Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
       int sleep = 3000/o; // just to simulate longer processing 
       Thread.Sleep(sleep); 
       Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
      }, 
     () => 
      { 
       Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
       are.Set(); 
      })) 
    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
        o => 
        { 
         Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
         Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
        }, 
        () => 
        { 
         Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
        })) 
    { 
     Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     source.OnNext(1); 
     source.OnNext(1.1); 
     source.OnNext(2); 
     source.OnNext(2.1); 
     source.OnNext(3); 
     source.OnNext(3.1); 
     source.OnCompleted(); 

     Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     are.WaitOne(); 
    } 
} 

結果の出力:

Starting on threadId:10 
Subscribed on threadId:10 
Finished on threadId:10 
Received 1 on threadId:11 
Handled 1 on threadId: 11 
Received 1,1 on threadId:12 
Handled 1,1 on threadId: 12 
Received 2,1 on threadId:12 
Handled 2,1 on threadId: 12 
Received 3,1 on threadId:12 
Handled 3,1 on threadId: 12 
Received 2 on threadId:11 
Handled 2 on threadId: 11 
OnCompleted on threadId:12 
Received 3 on threadId:11 
Handled 3 on threadId: 11 
OnCompleted on threadId:11 

あなたは順序が入力に異なっている見ることができるように

は、ここに私のユニットテストコードです。両方のサブスクリプションを同期して、入力と同じ順序にする必要があります。

出力は

Starting on threadId:10 
Subscribed on threadId:10 
Finished on threadId:10 
Received 1 on threadId:11 
Handled 1 on threadId: 11 
Received 1,1 on threadId:12 
Handled 1,1 on threadId: 12 
Received 2 on threadId:11 
Handled 2 on threadId: 11 
Received 2,1 on threadId:12 
Handled 2,1 on threadId: 12 
Received 3 on threadId:11 
Handled 3 on threadId: 11 
Received 3,1 on threadId:12 
Handled 3,1 on threadId: 12 
OnCompleted on threadId:11 
OnCompleted on threadId:12 

(完了順序は、私にとってそれは重要ではありません)でなければなりません。

EDIT:

私も次のことを試してみました:

[Test] 
public void TestObserveOn() 
{ 
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
    var source = new Subject<object>(); 
    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(); 
    var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler); 
    var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory); 
    var are = new AutoResetEvent(false); 

    using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
     o => 
      { 
       Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
       int sleep = 3000/o; 
       Thread.Sleep(sleep); 
       Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
      }, 
     () => 
      { 
       Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
       are.Set(); 
      })) 
    using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
        o => 
        { 
         Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o); 
         Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o); 
        }, 
        () => 
        { 
         Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 
         are.Set(); 
        })) 
    { 
     Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     source.OnNext(1); 
     source.OnNext(1.1); 
     source.OnNext(2); 
     source.OnNext(2.1); 
     source.OnNext(3); 
     source.OnNext(3.1); 
     source.OnCompleted(); 

     Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId); 

     are.WaitOne(); 
     are.WaitOne(); 
    } 
} 

しかし、出力はまだ間違っている:

Starting on threadId:10 
Subscribed on threadId:10 
Finished on threadId:10 
Received 1 on threadId:4 
Handled 1 on threadId: 4 
Received 2 on threadId:4 
Handled 2 on threadId: 4 
Received 3 on threadId:4 
Handled 3 on threadId: 4 
OnCompleted on threadId:4 
Received 1,1 on threadId:4 
Handled 1,1 on threadId: 4 
Received 2,1 on threadId:4 
Handled 2,1 on threadId: 4 
Received 3,1 on threadId:4 
Handled 3,1 on threadId: 4 
OnCompleted on threadId:4 

...あなたはそれがでない見ることができるようにOnNext()呼び出しの順序

これは、作成のような意味を持つ型を使用し、その後いくつかの更新を行う場合に特に重要です...更新が作成前であればどうしますか?順序が保証されていない場合は、問題が発生している可能性があります。または、先行操作が変更する状態と同期するまで、「将来の」イベントをキューに入れる必要があります。 これを注文基準として使用し、「穴」を見つけて、後続の列が再び並ぶまでキューに入れるには、バージョン/注文番号が増えるようなものが必要です。

第二EDIT ...私の問題に、より近いものとテストケース論から抜け出すために:

を私はRXフィルタリング機能で使いやすいシンプルなインターフェイスをしたいです

public interface ICommandBus // or to say Aggregator pattern 
{ 
    void Send<T>(T command) where T : ICommand; // might be something like Task<Result> Send<T>(T command) to know the system has accepted the command 

    IObservable<T> Stream<T>() where T : ICommand; 
} 

public class CommandBus : ICommandBus, IDisposable 
{ 
    private static readonly ILog Log = LogManager.GetLogger<CommandBus>(); 

    private readonly HashSet<Type> registrations = new HashSet<Type>(); 

    private readonly Subject<ICommand> stream = new Subject<ICommand>(); 

    private readonly IObservable<ICommand> notifications; 

    private bool disposed; 

    public CommandBus() 
    { 
     // hmm, this is a problem!? how to sync? 
     this.notifications = this.stream.SubscribeOn(TaskPoolScheduler.Default); 

    } 

    public IObservable<T> Stream<T>() where T : ICommand 
    { 
     var observable = this.notifications.OfType<T>(); 
     return new ExclusiveObservableWrapper<T>(
      observable, 
      t => this.registrations.Add(t), 
      t => this.registrations.Remove(t)); 
    } 

    public void Send<T>(T command) where T : ICommand 
    { 
     if (command == null) 
     { 
      throw new ArgumentNullException("command"); 
     } 

     if (!this.registrations.Contains(typeof(T))) 
     { 
      throw new NoCommandHandlerSubscribedException(); 
     } 

     Log.Debug(logm => logm("Sending command of type {0}.", typeof(T).Name)); 

     this.stream.OnNext(command); 
    } 

    //public async Task SendAsync<T>(T command) where T : ICommand 
    //{ 
    // if (command == null) 
    // { 
    //  throw new ArgumentNullException("command"); 
    // } 

    // if (!this.registrations.Contains(typeof(T))) 
    // { 
    //  throw new NoCommandHandlerSubscribedException(); 
    // } 

    // Log.Debug(logm => logm("Sending command of type {0}.", typeof(T))); 

    // this.stream.OnNext(command); 

    // await this.stream.Where(item => ReferenceEquals(item, command)); 
    //} 

    public void Dispose() 
    { 
     this.Dispose(true); 
     GC.SuppressFinalize(this); 
    } 

    protected virtual void Dispose(bool disposing) 
    { 
     if (!this.disposed) 
     { 
      if (disposing) 
      { 
       this.stream.Dispose(); 
      } 
     } 

     this.disposed = true; 
    } 

    [Serializable] 
    public class CommandAlreadySubscribedException : Exception 
    { 
     internal CommandAlreadySubscribedException(Type type) 
      : base(string.Format("Tried to subscribe handler for command of type {0} but there was already a subscribtion. More than one handler at time is not allowed.", type)) 
     { 
     } 

     protected CommandAlreadySubscribedException(SerializationInfo info, StreamingContext context) 
      : base(info, context) 
     { 
     } 
    } 

    [Serializable] 
    public class NoCommandHandlerSubscribedException : Exception 
    { 
     public NoCommandHandlerSubscribedException() 
     { 
     } 

     public NoCommandHandlerSubscribedException(string message) 
      : base(message) 
     { 
     } 

     public NoCommandHandlerSubscribedException(string message, Exception innerException) 
      : base(message, innerException) 
     { 
     } 

     protected NoCommandHandlerSubscribedException(SerializationInfo info, StreamingContext context) 
      : base(info, context) 
     { 
     } 
    } 

    private class ExclusiveObservableWrapper<T> : IObservable<T> where T : ICommand 
    { 
     private readonly IObservable<T> observable; 

     private readonly Func<Type, bool> register; 

     private readonly Action<Type> unregister; 

     internal ExclusiveObservableWrapper(IObservable<T> observable, Func<Type, bool> register, Action<Type> unregister) 
     { 
      this.observable = observable; 
      this.register = register; 
      this.unregister = unregister; 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      var subscription = this.observable.Subscribe(observer); 
      var type = typeof(T); 

      if (!this.register(type)) 
      { 
       observer.OnError(new CommandAlreadySubscribedException(type)); 
      } 

      return Disposable.Create(
       () => 
       { 
        subscription.Dispose(); 
        this.unregister(type); 
       }); 
     } 
    } 
} 

コマンドが指定された順序であると保証できない場合は、意味がありません。 (作成前に更新)

ICommandBusは、コマンドの対応するハンドラを呼び出すUI /プレゼンテーションレイヤーから使用されます(ハンドラを知る必要はありません)。

チェーンを別のスレッドにオフロードするだけです。

コマンド - >バス - >コマンドハンドラ - >ドメインモデル - >イベント - >イベントハンドラ - >読むモデル

これは出現順にコマンドを維持する必要があります。

私は、RXがちょっとした「魔法の行」でこれを行うことができると思いました。しかし、私が今見ている限り、私は自分のスレッド処理でもう一度やり直さなければなりません。 :-(

+0

RXに新しいです。私はオフロードのためにそれを使用する方法があることを望んだが、複数の加入者であってもシリーズを保つことを望んだ。 – Beachwalker

+0

並行性を可能にするスケジューラーを使用している場合はありません。代わりに 'EventLoopScheduler'を試してみてください。あなたの助けには – Enigmativity

答えて