2011-10-19 3 views
3

私はRxを初めて使っています。私はそれが別のスレッドで実行されるように異なる加入者にメッセージをディスパッチすることが可能かどうかを知りたいですか? IObserableはどのようにそれを制御できますか?私が理解しているように、単純なSubject実装は、1つのスレッド上で1つのサブスクライバを1つずつ呼び出します。Rx内の異なるスレッドでサブスクライバのOnNextを呼び出すことはできますか?


public class Subsciber : IObserver<int> 
{ 
    public void OnNext(int a) 
    { 
     // Do something 
    } 
    public void OnError(Exception e) 
    { 
     // Do something 
    } 
    public void OnCompeleted() 
    { 
    } 

} 

public static class Program 
{ 
    public void static Main() 
    { 
     var observable = new <....SomeClass....>(); 
     var sub1 = new Subscriber(); 
     var sub2 = new Subscriber(); 
     observable.Subscribe(sub1); 
     observable.Subscribe(sub2); 
     // some waiting function 
    } 
} 

私はSUB1のOnNext()が完了するまで、 '工ass'、その後、SUB2のOnNext()が呼び出されませんよう件名を使用する場合。 sub1に時間がかかる場合は、sub2の受信を遅らせることは望ましくありません。 RxがSomeClassのこの種の実装をどのように許可しているか教えてください。

+0

熱く見えるか冷たいですか? – Richard

答えて

7

あなたが書いたコードは、ほぼ同時に観察可能なものを実行するためのものです。あなたはこのようあなたのオブザーバを記述する場合:

public class Subscriber : IObserver<int> 
{ 
    public void OnNext(int a) 
    { 
     Console.WriteLine("{0} on {1} at {2}", 
      a, 
      Thread.CurrentThread.ManagedThreadId, 
      DateTime.Now.ToString()); 
    } 
    public void OnError(Exception e) 
    { } 
    public void OnCompleted() 
    { } 
} 

次に、このコードは実行されている:

var observable = 
    Observable 
     .Interval(TimeSpan.FromSeconds(1.0)) 
     .Select(x => (int)x) 
     .Take(5) 
     .ObserveOn(Scheduler.ThreadPool); 
var sub1 = new Subscriber(); 
var sub2 = new Subscriber(); 
observable.Subscribe(sub1); 
observable.Subscribe(sub2); 
Thread.Sleep(10000); 

すると、次が生成されます:

0 on 28 at 2011/10/20 00:13:49 
0 on 16 at 2011/10/20 00:13:49 
1 on 29 at 2011/10/20 00:13:50 
1 on 22 at 2011/10/20 00:13:50 
2 on 27 at 2011/10/20 00:13:51 
2 on 29 at 2011/10/20 00:13:51 
3 on 27 at 2011/10/20 00:13:52 
3 on 19 at 2011/10/20 00:13:52 
4 on 27 at 2011/10/20 00:13:53 
4 on 27 at 2011/10/20 00:13:53 

それはすでに別のスレッドで並行してサブスクリプションを実行しています。

私が使用した重要なことは、.ObserveOn拡張メソッドでした。これがこの作業をしました。

オブザーバーは、通常、オブザーバーの同じインスタンスを共有しないことに注意してください。オブザーバブルに加入することにより、オブザーバブルのソースからオブザーバまで、観察可能なオペレータの固有の「連鎖」が効果的に配線されます。これは、列挙型でGetEnumeratorを2回呼び出した場合と同じですが、同じ列挙型インスタンスを共有しないため、2つの一意のインスタンスが取得されます。

ここで、私はチェーンが意味することを説明したいと思います。 Reflector.NETで抽出されたコードをObservable.Generate & Observable.Whereから説明します。両方Generate & Where各々は、内部のRxクラスAnonymousObservable<T>の新しいインスタンスを作成し、フード下

var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x); 
var ys = xs.Where(x => x % 2 == 0); 
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ }); 

は、例えば、このコードを取ります。 AnonymousObservable<T>のコンストラクタは、Subscribeへの呼び出しを受信するたびに使用する代理人Func<IObserver<T>, IDisposable>を受け取ります。

Reflector.NETからObservable.Generate<T>(...)ためわずかにクリーンアップコードは次のとおり

public static IObservable<TResult> Generate<TState, TResult>(
    TState initialState, 
    Func<TState, bool> condition, 
    Func<TState, TState> iterate, 
    Func<TState, TResult> resultSelector, 
    IScheduler scheduler) 
{ 
    return new AnonymousObservable<TResult>((IObserver<TResult> observer) => 
    { 
     TState state = initialState; 
     bool first = true; 
     return scheduler.Schedule((Action self) => 
     { 
      bool flag = false; 
      TResult local = default(TResult); 
      try 
      { 
       if (first) 
       { 
        first = false; 
       } 
       else 
       { 
        state = iterate(state); 
       } 
       flag = condition(state); 
       if (flag) 
       { 
        local = resultSelector(state); 
       } 
      } 
      catch (Exception exception) 
      { 
       observer.OnError(exception); 
       return; 
      } 
      if (flag) 
      { 
       observer.OnNext(local); 
       self(); 
      } 
      else 
      { 
       observer.OnCompleted(); 
      } 
     }); 
    }); 
} 

Action selfパラメータは、反復出力値ことが再帰呼び出しです。このコードのどこにもobserverが格納されていないか、値が複数のオブザーバに貼り付けられていることに気づくでしょう。このコードは新しいオブザーバーごとに1回実行されます。

ReflectorからObservable.Where<T>(...)のわずかにクリーンアップされたコード。NETは:

public static IObservable<TSource> Where<TSource>(
    this IObservable<TSource> source, 
    Func<TSource, bool> predicate) 
{ 
    return new AnonymousObservable<TSource>(observer => 
     source.Subscribe(x => 
     { 
      bool flag; 
      try 
      { 
       flag = predicate(x); 
      } 
      catch (Exception exception) 
      { 
       observer.OnError(exception); 
       return; 
      } 
      if (flag) 
      { 
       observer.OnNext(x); 
      } 
     }, ex => observer.OnError(ex),() => observer.OnCompleted)); 
} 

このコードでも、複数のオブザーバは追跡されません。 Subscribeは、観測者として自身のコードを実際に観測可能なsourceに渡します。

上記の例のコードでは、Whereを購読すると、Generateのサブスクリプションが作成され、これが観測可能なチェーンであることがわかります。実際には、一連のAnonymousObservableオブジェクトにサブスクライブコールをチェーンしています。

2つのサブスクリプションをお持ちの場合、2つのチェーンがあります。 1000回の定期購読をしている場合は、1,000本のチェーンがあります。

IObservable<T>IObserver<T>のインターフェイスがあるにもかかわらず、ちょっとしたこととして、実際にあなた自身のクラスでこれを実装することはほとんどありません。組み込みのクラスと演算子は、すべてのケースの99.99%を処理します。 IEnumerable<T>のようなものです。このインターフェースを自分で実装する頻度はどれくらいですか?

これが役立つか、さらに詳しい説明が必要な場合はお知らせください。あなたはIObservableを持っていて、別のスレッドで実行するようにサブスクリプションを強制する必要がある場合は

+0

ほとんどのサブスクリプションがユニークなチェーンを形成しているとは言いませんでした。 – ada

+0

public void Run() {var src = GetInput()。ToObservable(Scheduler.NewThread); var d = src.Subscribe( c => Console.WriteLine(c + "blah1" + DateTime.Now + System.Threading.Thread.CurrentThread.ManagedThreadId)); var d2 = src.Subscribe( c => Console.WriteLine(c + "cccc" + DateTime.Now + System.Threading.Thread.CurrentThread.ManagedThreadId)); System.Threading.Thread.Sleep(60000); }静的IEnumerableをか、getInput(){ 一方(真) {収率リターンint.Parse(Console.ReadLine())。 } } – ada

+0

申し訳ありません、上記のフォーマットが間違っています。しかし、私はd2とdがコンソール上に書いた各整数の代わりにコンソールに書き込むことに驚きました。これはあなたが「ほとんどのサブスクリプションがユニークなチェーンを形成する」という意味ですか? – ada

1

、あなたはObserveOn機能を使用することができます。

次のコードを実行すると、数値ジェネレータが別のスレッドコンテキストで実行されます。また、出力

Immediate: 10 
ThreadPool: 4 
TaskPool: 20 
TaskPool: 4 
ThreadPool: 24 
Immediate: 27 
Immediate: 10 
TaskPool: 24 
ThreadPool: 27 
Immediate: 24 
TaskPool: 26 
ThreadPool: 20 
Immediate: 26 
ThreadPool: 24 
TaskPool: 27 
Immediate: 28 
ThreadPool: 27 
TaskPool: 26 
Immediate: 10 

...あなたが使用したい、設定された優先順位、セット名など

void Main() 
{ 
    var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100)); 

    var disposable = new CompositeDisposable() 
    { 
     numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)), 
     numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)), 
     numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId)) 
    }; 

    Thread.Sleep(1000); 
    disposable.Dispose(); 
} 

EventLoopSchedulerを使用してSystem.Threadを指定することができますCompositeDisposableを使用して、最後にすべてのサブスクリプションを廃棄する方法に注意してください。たとえば、LinqPadでこれをしない場合。 Observable.Intervalは、プロセスを終了するまでメモリ内で実行を続けます。

関連する問題