2016-05-10 5 views
2

IObservableシーケンス内のすべての観測に対して呼び出すasnyc関数があり、配信を1つのイベントに制限します。消費者は、飛行中に複数のメッセージしか期待していない。私が正しく理解すれば、これもRX契約です。非同期関数で観測可能なシーケンスを購読する

このサンプルを検討:

static void Main() { 
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
    //var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit. 
    var d = ob.Subscribe(x => Consume(x).Wait()); 
    Thread.Sleep(10000); 
    d.Dispose(); 
} 

static async Task<Unit> Consume(long count) { 
    Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}"); 
    await Task.Delay(750); 
    Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}"); 
    return Unit.Default; 
} 
Consume

機能は、処理時間を750ミリ秒を偽物とobはイベント毎に100ミリ秒を産生します。上記のコードは動作しますが、ランダムスレッドでtask.Wait()を呼び出します。私がコメントアウトされた行3のように代わりに購読すると、Consumeが同じレートで呼び出され、obがイベントを生成します(そして、このコメント文ではSubscribeのオーバーロードが何であるかわからないので、おそらくナンセンスです)。

したがって、観測可能なシーケンスから一度に1つのイベントをasync関数に正しく配信するにはどうすればよいですか?

+0

これを見てください:http://stackoverflow.com/questions/23006852/howto-call-back-async-function-from-rx-subscribe –

+0

@NedStoyanov:ああ、reijerhの答えhttp:// stackoverflow。 co.jp/a/30030553/1149924は機能しますが、驚くほど忍耐強くはありません。これはあなたがRXでやる*方法ですか? – kkm

答えて

6

サブスクライバは長時間実行されることは想定されていないため、サブスクリプションハンドラで長時間実行される非同期メソッドの実行をサポートしていません。

代わりに、非同期メソッドを別のシーケンスから値をとる単一の値観測可能なシーケンスと見なしてください。 Rxがデザインしたシーケンスを作成することができます。

これで飛躍しました。おそらく、@ReijherがHowto call back async function from rx subscribe?に作成したようなものがあります。

コードの分解は次のとおりです。

//The input sequence. Produces values potentially quicker than consumer 
Observable.Interval(TimeSpan.FromSeconds(1)) 
     //Project the event you receive, into the result of the async method 
     .Select(l => Observable.FromAsync(() => asyncMethod(l))) 
     //Ensure that the results are serialized 
     .Concat() 
     //do what you will here with the results of the async method calls 
     .Subscribe(); 

このシナリオでは、暗黙的なキューを作成しています。 プロデューサが消費者よりも速い場合は、待機中に値を収集するためにキューを使用する必要があります。 個人的には、データをキューに入れて明示的に指定することをお勧めします。 また、スケジューラを明示的に使用して、スラックを取得するスレッドモデルを通知することもできます。

これは、Rxの新人にとって一般的なハードル(サブスクライブハンドラで非同期を実行する)のようです。 たとえば、エラーモデル を破棄します。2.非同期モデルを混在させています(ここでは、ここではタスク) 3.サブスクライブは消費者です非同期シーケンスの構成の一部である。非同期メソッドは単なる値のシーケンスなので、そのビューはシーケンスの終わりではありませんが、結果はそうかもしれません。

UPDATE

ここでエラーモデルを破壊についてのコメントを説明するためには、OPサンプルのアップデートです。

void Main() 
{ 
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
    var d = ob.Subscribe(
     x => ConsumeThrows(x).Wait(), 
     ex=> Console.WriteLine("I will not get hit")); 

    Thread.Sleep(10000); 
    d.Dispose(); 
} 

static async Task<Unit> ConsumeThrows(long count) 
{ 
    return await Task.FromException<Unit>(new Exception("some failure")); 
    //this will have the same effect of bringing down the application. 
    //throw new Exception("some failure"); 
} 

ここでは、OnNextハンドラがスローするようにした場合は、その後、我々はRxのOnErrorハンドラによって保護されていないことがわかります。 例外は処理されず、アプリケーションを停止させる可能性があります。

+0

ありがとうございました、あなたの説明はどのようにこの作品の作品は非常に良いです!私の場合、消費者は与えられている(非同期と宣言されているgRPCストリーム)ので、私は選択肢があまりありません。しかし、通常のネットワーク環境ではボトルネックになるとは思っていませんが、そうであれば、合理的な回復戦略で処理できるよりも多くの問題がある可能性があります。 g。データを削除します。私は明示的なキューを検討しますが、それはそのままですが、このソリューションは確かに私の目的のために大丈夫です。 – kkm

+0

Lee、「エラーモデルを破る」とはどういう意味ですか?私はConsumeから例外をスローしているとき、OnErrorでシーケンスを終了します。一見すると、エラーは彼らがするべきことをするように見えます。私は何か不足していますか? – kkm

+0

OPのあなたの例では、エラー文 'throw new Exception("何らかの不具合 ");または' return await Task.FromException (新しい例外( "何らかの不具合")); 'アプリケーション。 'OnError'ハンドラを打つことはありません。しかし、非同期メソッドがSubscribeから「Select」に移動されるサンプルコードでは、シーケンスは正常に処理できるOnErrorで終了します。 –

関連する問題