2015-11-14 8 views
8

レガシーシステムの状態を知るために、適時にデータベースにクエリを実行する必要があります。私はObservableの周りにクエリをラップすると思ったが、私はそれを行う正しい方法を知らない。Reactive Extensionsによるデータベースポーリング

基本的に、5秒ごとに同じクエリになります。しかし、私はこれらの問題に直面しなければならないと恐れています。

  • クエリの実行に10秒かかる場合はどうなりますか?私は は、前の処理がまだ処理されている場合は、新しいクエリを実行したくありません。
  • また、タイムアウトが必要です。例えば、現在のクエリが を20秒後に実行しない場合、有益なメッセージは に記録され、新しい試み(同じクエリ)が送られるべきである。

エクストラ詳細:

  • クエリがあるだけでステータスコードのリスト(を故障したワーキング)でデータセットを返しますSELECT
  • 観測可能なシーケンスは常に、クエリから受信した最新のデータを取ります。これは、スイッチの拡張メソッドのようなものです。
  • 私は、タスクにデータベースクエリ(長時間の操作)をラップしたいと思いますが、それが最適なオプションかどうかはわかりません。

私は、クエリが別のスレッドで実行されるべきだと確信していますが、どのように観察可能なように見えるか分かりません。Introduction to Rx by Lee Campbellです。

+0

詳細を追加できますか?クエリーはどのデータを戻しますか?クエリは単一のオブジェクトを返しますか?タイムアウトの場合は、新しいクエリを開始したいとします。そのクエリは何ですか? –

答えて

14

Rxを使用して別のシステムをポーリングするのはかなり一般的なケースです。ほとんどの人はgo-to演算子としてObservable.Intervalを使用しますが、ほとんどの場合、問題ありません。

ただし、タイムアウトと再試行に関する特定の要件があります。あなたが

  • ToObservable()をオーバーランしているクエリを特定し、データベースに指定された時間
  • Timeoutにクエリを実行できるようにするために

    • Observable.Timer:この場合、私はあなたの演算子の組み合わせを使用したほうが良いと思いますTask結果を観察可能なシーケンスにマッピングします。あなたは成功したデータベースクエリの後に続けることができるようにタイムアウト
    • Repeat後に回復できるようにする
    • Retry。これはまた、前のデータベースクエリの完了と次のクエリの開始との間の初期期間/ギャップを維持する。

    この作業LINQPadスニペットは、クエリが正常に動作しますが表示されるはずです:

    void Main() 
    { 
        var pollingPeriod = TimeSpan.FromSeconds(5); 
        var dbQueryTimeout = TimeSpan.FromSeconds(10); 
    
        //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence. 
        var rxQueryTimeOut = pollingPeriod + dbQueryTimeout; 
    
        var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" }); 
    
        var query = Observable.Timer(pollingPeriod, scheduler) 
            .SelectMany(_ => DatabaseQuery().ToObservable()) 
            .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) 
            .Retry() //Loop on errors 
            .Repeat(); //Loop on success 
    
        query.StartWith("Seed") 
         .TimeInterval(scheduler) //Just to debug, print the timing gaps. 
         .Dump(); 
    } 
    
    // Define other methods and classes here 
    private static int delay = 9; 
    private static int delayModifier = 1; 
    public async Task<string> DatabaseQuery() 
    { 
        //Oscillate the delay between 3 and 12 seconds 
        delay += delayModifier; 
        var timespan = TimeSpan.FromSeconds(delay); 
        if (delay < 4 || delay > 11) 
         delayModifier *= -1; 
        timespan.Dump("delay"); 
        await Task.Delay(timespan); 
        return "Value"; 
    } 
    

    結果は次のようになります。

    Seed 00:00:00.0125407 
    Timeout 00:00:15.0166379 
    Timeout 00:00:15.0124480 
    Timeout 00:00:15.0004520 
    Timeout 00:00:15.0013296 
    Timeout 00:00:15.0140864 
    Value 00:00:14.0251731 
    Value 00:00:13.0231958 
    Value 00:00:12.0162236 
    Value 00:00:11.0138606 
    

    サンプルの重要な一部である....

    var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler) 
           .SelectMany(_ => DatabaseQuery().ToObservable()) 
           .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) 
           .Retry() //Loop on errors 
           .Repeat(); //Loop on success 
    

    編集: ここでは、このソリューションに到達する方法の詳細な説明です。 https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

  • +0

    ここでEventLoopSchedulerを使用してクエリが同じスレッドで実行されることを確認する主な目的はありますか?これは、RXを使って別のシステムをポーリングするときに行く最善の方法ですか? – jumpercake

    +0

    ここが正しいです。この場合は、タスク/スレッドプールと競合しないようにすることをお勧めします。この例でスレッドに名前を付けると、ほとんどのログ製品もそれを公開します。ただし、ここでEventLoopSchedulerを使用することは必須ではなく、Rxは作業のシリアライズを維持します。 –

    +0

    実装に到達するための動作を含むサンプルコードへのリンクを含むように更新されました –

    1

    私は、これはあなたが何をすべきかだと思う:

    var query = 
        from n in Observable.Interval(TimeSpan.FromSeconds(5.0)) 
        from ds in Observable.Amb(
         Observable.Start(() => /* Your DataSet query */), 
         Observable 
          .Timer(TimeSpan.FromSeconds(10.0)) 
          .Select(_ => new DataSet("TimeOut"))) 
        select ds; 
    

    これは、5秒の実行間隔で新しいクエリをトリガします。最後のものが始まってから5秒ではなく、最後のものが終了してから5秒です。

    クエリを試してみますが、.Ambには、10秒後に特別なDataSetを返すタイマーがあります。 10秒前にクエリが終了すると勝ちますが、それ以外の場合はDataSetが返されます。 .Amb演算子は、基本的には "競合"演算子です - 値を生成するための最初の観測値が勝ちます。

    +0

    うわー、観察可能な組み合わせはかなり印象的です!両方の観測が競争し、Ambが最初に来ることを意味します。私を傷つけるのは、2つの塊からネストされたものです。その部分は、「これは5秒間の実行間隔で新しいクエリを起動します。最後の開始から5秒ではなく、最後の終了から5秒です」という部分を許可する部分ですか? – SuperJMN

    +1

    @ SuperJMN - ありがとう。 Observable.Interval(TimeSpan.FromSeconds(5.0))は、すべての加入者が作業を終えてから5秒後にのみ起動します。したがって、クエリの2番目の部分が動作している場合、間隔が終了してから5秒後に間隔が発生しません。 – Enigmativity

    +0

    私はあなたの 'Observable.Interval'を消費するために' SelectMany'を使用しているので、あなたは消費しないので、5秒ごとにチェックされます。 –

    関連する問題