IObservable
シーケンス内のすべての観測に対して呼び出すasnyc
関数があり、配信を1つのイベントに制限します。消費者は、飛行中に複数のメッセージしか期待していない。私が正しく理解すれば、これもRX契約です。非同期関数で観測可能なシーケンスを購読する
このサンプルを検討:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
Consume
機能は、処理時間を750ミリ秒を偽物とob
はイベント毎に100ミリ秒を産生します。上記のコードは動作しますが、ランダムスレッドでtask.Wait()
を呼び出します。私がコメントアウトされた行3のように代わりに購読すると、Consume
が同じレートで呼び出され、ob
がイベントを生成します(そして、このコメント文ではSubscribe
のオーバーロードが何であるかわからないので、おそらくナンセンスです)。
したがって、観測可能なシーケンスから一度に1つのイベントをasync
関数に正しく配信するにはどうすればよいですか?
これを見てください:http://stackoverflow.com/questions/23006852/howto-call-back-async-function-from-rx-subscribe –
@NedStoyanov:ああ、reijerhの答えhttp:// stackoverflow。 co.jp/a/30030553/1149924は機能しますが、驚くほど忍耐強くはありません。これはあなたがRXでやる*方法ですか? – kkm