あなたが書いたコードは、ほぼ同時に観察可能なものを実行するためのものです。あなたはこのようあなたのオブザーバを記述する場合:
public class Subscriber : IObserver<int>
{
public void OnNext(int a)
{
Console.WriteLine("{0} on {1} at {2}",
a,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString());
}
public void OnError(Exception e)
{ }
public void OnCompleted()
{ }
}
次に、このコードは実行されている:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => (int)x)
.Take(5)
.ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);
すると、次が生成されます:
0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53
それはすでに別のスレッドで並行してサブスクリプションを実行しています。
私が使用した重要なことは、.ObserveOn
拡張メソッドでした。これがこの作業をしました。
オブザーバーは、通常、オブザーバーの同じインスタンスを共有しないことに注意してください。オブザーバブルに加入することにより、オブザーバブルのソースからオブザーバまで、観察可能なオペレータの固有の「連鎖」が効果的に配線されます。これは、列挙型でGetEnumerator
を2回呼び出した場合と同じですが、同じ列挙型インスタンスを共有しないため、2つの一意のインスタンスが取得されます。
ここで、私はチェーンが意味することを説明したいと思います。 Reflector.NETで抽出されたコードをObservable.Generate
& Observable.Where
から説明します。両方Generate
& Where
各々は、内部のRxクラスAnonymousObservable<T>
の新しいインスタンスを作成し、フード下
var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });
:
は、例えば、このコードを取ります。 AnonymousObservable<T>
のコンストラクタは、Subscribe
への呼び出しを受信するたびに使用する代理人Func<IObserver<T>, IDisposable>
を受け取ります。
Reflector.NETからObservable.Generate<T>(...)
ためわずかにクリーンアップコードは次のとおり
public static IObservable<TResult> Generate<TState, TResult>(
TState initialState,
Func<TState, bool> condition,
Func<TState, TState> iterate,
Func<TState, TResult> resultSelector,
IScheduler scheduler)
{
return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
{
TState state = initialState;
bool first = true;
return scheduler.Schedule((Action self) =>
{
bool flag = false;
TResult local = default(TResult);
try
{
if (first)
{
first = false;
}
else
{
state = iterate(state);
}
flag = condition(state);
if (flag)
{
local = resultSelector(state);
}
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(local);
self();
}
else
{
observer.OnCompleted();
}
});
});
}
Action self
パラメータは、反復出力値ことが再帰呼び出しです。このコードのどこにもobserver
が格納されていないか、値が複数のオブザーバに貼り付けられていることに気づくでしょう。このコードは新しいオブザーバーごとに1回実行されます。
ReflectorからObservable.Where<T>(...)
のわずかにクリーンアップされたコード。NETは:
public static IObservable<TSource> Where<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return new AnonymousObservable<TSource>(observer =>
source.Subscribe(x =>
{
bool flag;
try
{
flag = predicate(x);
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(x);
}
}, ex => observer.OnError(ex),() => observer.OnCompleted));
}
このコードでも、複数のオブザーバは追跡されません。 Subscribe
は、観測者として自身のコードを実際に観測可能なsource
に渡します。
上記の例のコードでは、Where
を購読すると、Generate
のサブスクリプションが作成され、これが観測可能なチェーンであることがわかります。実際には、一連のAnonymousObservable
オブジェクトにサブスクライブコールをチェーンしています。
2つのサブスクリプションをお持ちの場合、2つのチェーンがあります。 1000回の定期購読をしている場合は、1,000本のチェーンがあります。
IObservable<T>
とIObserver<T>
のインターフェイスがあるにもかかわらず、ちょっとしたこととして、実際にあなた自身のクラスでこれを実装することはほとんどありません。組み込みのクラスと演算子は、すべてのケースの99.99%を処理します。 IEnumerable<T>
のようなものです。このインターフェースを自分で実装する頻度はどれくらいですか?
これが役立つか、さらに詳しい説明が必要な場合はお知らせください。あなたはIObservableを持っていて、別のスレッドで実行するようにサブスクリプションを強制する必要がある場合は
熱く見えるか冷たいですか? – Richard