私は、同じソースへのサブスクリプションを持つ2つの単純な観測ハンドラを持っています。ただし、両方のサブスクリプションは異なるタイプで動作します。私は彼らに観察可能な情報源(Subject())の順序を保持してもらいたい。私はSynchronize()拡張で試しましたが、期待通りにこの作業を行う方法が見つかりませんでした。テストコードのオブザーバブルとオフロードUIの同期方法スレッド
[Test]
public void TestObserveOn()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = new Subject<object>();
var are = new AutoResetEvent(false);
using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
int sleep = 3000/o; // just to simulate longer processing
Thread.Sleep(sleep);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
}))
{
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
source.OnNext(1);
source.OnNext(1.1);
source.OnNext(2);
source.OnNext(2.1);
source.OnNext(3);
source.OnNext(3.1);
source.OnCompleted();
Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.WaitOne();
}
}
結果の出力:
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled 1 on threadId: 11
Received 1,1 on threadId:12
Handled 1,1 on threadId: 12
Received 2,1 on threadId:12
Handled 2,1 on threadId: 12
Received 3,1 on threadId:12
Handled 3,1 on threadId: 12
Received 2 on threadId:11
Handled 2 on threadId: 11
OnCompleted on threadId:12
Received 3 on threadId:11
Handled 3 on threadId: 11
OnCompleted on threadId:11
あなたは順序が入力に異なっている見ることができるように
は、ここに私のユニットテストコードです。両方のサブスクリプションを同期して、入力と同じ順序にする必要があります。
出力は
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled 1 on threadId: 11
Received 1,1 on threadId:12
Handled 1,1 on threadId: 12
Received 2 on threadId:11
Handled 2 on threadId: 11
Received 2,1 on threadId:12
Handled 2,1 on threadId: 12
Received 3 on threadId:11
Handled 3 on threadId: 11
Received 3,1 on threadId:12
Handled 3,1 on threadId: 12
OnCompleted on threadId:11
OnCompleted on threadId:12
(完了順序は、私にとってそれは重要ではありません)でなければなりません。
EDIT:
私も次のことを試してみました:
[Test]
public void TestObserveOn()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = new Subject<object>();
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler);
var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory);
var are = new AutoResetEvent(false);
using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
int sleep = 3000/o;
Thread.Sleep(sleep);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
{
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
source.OnNext(1);
source.OnNext(1.1);
source.OnNext(2);
source.OnNext(2.1);
source.OnNext(3);
source.OnNext(3.1);
source.OnCompleted();
Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.WaitOne();
are.WaitOne();
}
}
しかし、出力はまだ間違っている:
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:4
Handled 1 on threadId: 4
Received 2 on threadId:4
Handled 2 on threadId: 4
Received 3 on threadId:4
Handled 3 on threadId: 4
OnCompleted on threadId:4
Received 1,1 on threadId:4
Handled 1,1 on threadId: 4
Received 2,1 on threadId:4
Handled 2,1 on threadId: 4
Received 3,1 on threadId:4
Handled 3,1 on threadId: 4
OnCompleted on threadId:4
...あなたはそれがでない見ることができるようにOnNext()呼び出しの順序
これは、作成のような意味を持つ型を使用し、その後いくつかの更新を行う場合に特に重要です...更新が作成前であればどうしますか?順序が保証されていない場合は、問題が発生している可能性があります。または、先行操作が変更する状態と同期するまで、「将来の」イベントをキューに入れる必要があります。 これを注文基準として使用し、「穴」を見つけて、後続の列が再び並ぶまでキューに入れるには、バージョン/注文番号が増えるようなものが必要です。
第二EDIT ...私の問題に、より近いものとテストケース論から抜け出すために:
:を私はRXフィルタリング機能で使いやすいシンプルなインターフェイスをしたいです
public interface ICommandBus // or to say Aggregator pattern
{
void Send<T>(T command) where T : ICommand; // might be something like Task<Result> Send<T>(T command) to know the system has accepted the command
IObservable<T> Stream<T>() where T : ICommand;
}
public class CommandBus : ICommandBus, IDisposable
{
private static readonly ILog Log = LogManager.GetLogger<CommandBus>();
private readonly HashSet<Type> registrations = new HashSet<Type>();
private readonly Subject<ICommand> stream = new Subject<ICommand>();
private readonly IObservable<ICommand> notifications;
private bool disposed;
public CommandBus()
{
// hmm, this is a problem!? how to sync?
this.notifications = this.stream.SubscribeOn(TaskPoolScheduler.Default);
}
public IObservable<T> Stream<T>() where T : ICommand
{
var observable = this.notifications.OfType<T>();
return new ExclusiveObservableWrapper<T>(
observable,
t => this.registrations.Add(t),
t => this.registrations.Remove(t));
}
public void Send<T>(T command) where T : ICommand
{
if (command == null)
{
throw new ArgumentNullException("command");
}
if (!this.registrations.Contains(typeof(T)))
{
throw new NoCommandHandlerSubscribedException();
}
Log.Debug(logm => logm("Sending command of type {0}.", typeof(T).Name));
this.stream.OnNext(command);
}
//public async Task SendAsync<T>(T command) where T : ICommand
//{
// if (command == null)
// {
// throw new ArgumentNullException("command");
// }
// if (!this.registrations.Contains(typeof(T)))
// {
// throw new NoCommandHandlerSubscribedException();
// }
// Log.Debug(logm => logm("Sending command of type {0}.", typeof(T)));
// this.stream.OnNext(command);
// await this.stream.Where(item => ReferenceEquals(item, command));
//}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.stream.Dispose();
}
}
this.disposed = true;
}
[Serializable]
public class CommandAlreadySubscribedException : Exception
{
internal CommandAlreadySubscribedException(Type type)
: base(string.Format("Tried to subscribe handler for command of type {0} but there was already a subscribtion. More than one handler at time is not allowed.", type))
{
}
protected CommandAlreadySubscribedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
[Serializable]
public class NoCommandHandlerSubscribedException : Exception
{
public NoCommandHandlerSubscribedException()
{
}
public NoCommandHandlerSubscribedException(string message)
: base(message)
{
}
public NoCommandHandlerSubscribedException(string message, Exception innerException)
: base(message, innerException)
{
}
protected NoCommandHandlerSubscribedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
private class ExclusiveObservableWrapper<T> : IObservable<T> where T : ICommand
{
private readonly IObservable<T> observable;
private readonly Func<Type, bool> register;
private readonly Action<Type> unregister;
internal ExclusiveObservableWrapper(IObservable<T> observable, Func<Type, bool> register, Action<Type> unregister)
{
this.observable = observable;
this.register = register;
this.unregister = unregister;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var subscription = this.observable.Subscribe(observer);
var type = typeof(T);
if (!this.register(type))
{
observer.OnError(new CommandAlreadySubscribedException(type));
}
return Disposable.Create(
() =>
{
subscription.Dispose();
this.unregister(type);
});
}
}
}
コマンドが指定された順序であると保証できない場合は、意味がありません。 (作成前に更新)
ICommandBusは、コマンドの対応するハンドラを呼び出すUI /プレゼンテーションレイヤーから使用されます(ハンドラを知る必要はありません)。
チェーンを別のスレッドにオフロードするだけです。
コマンド - >バス - >コマンドハンドラ - >ドメインモデル - >イベント - >イベントハンドラ - >読むモデル
これは出現順にコマンドを維持する必要があります。
私は、RXがちょっとした「魔法の行」でこれを行うことができると思いました。しかし、私が今見ている限り、私は自分のスレッド処理でもう一度やり直さなければなりません。 :-(
RXに新しいです。私はオフロードのためにそれを使用する方法があることを望んだが、複数の加入者であってもシリーズを保つことを望んだ。 – Beachwalker
並行性を可能にするスケジューラーを使用している場合はありません。代わりに 'EventLoopScheduler'を試してみてください。あなたの助けには – Enigmativity