2017-01-27 15 views
4

反応(Database polling with Reactive Extensions)を使用して、データベース・ポーリングに良い質問がまだありReactive Extensionsを使用して状態をポーリングするにはどうすればよいですか?

私は同様の質問を持っていますが、ひねりを加えた:私は、次の要求に前の結果から値を供給する必要があります。基本的に、私はこれをポーリングしたいと思います:

interface ResultSet<T> 
{ 
    int? CurrentAsOfHandle {get;} 
    IList<T> Results {get;} 
} 

Task<ResultSet<T>> GetNewResultsAsync<T>(int? previousRequestHandle); 

アイデアは私がGetNewResultsAsync

  • 私を呼び出すしたいと思います毎分これは前回の要求以降のすべての新しいアイテムを返すこと


    1. です前回の呼び出しのCurrentAsOfを引数としてpreviousRequestパラメータ
    2. に転送する場合は、次のの呼び出し

      return Observable.Create<IMessage>(async (observer, cancellationToken) => 
      { 
          int? currentVersion = null; 
      
          while (!cancellationToken.IsCancellationRequested) 
          { 
           MessageResultSet messageResultSet = await ReadLatestMessagesAsync(currentVersion); 
      
           currentVersion = messageResultSet.CurrentVersionHandle; 
      
           foreach (IMessage resultMessage in messageResultSet.Messages) 
            observer.OnNext(resultMessage); 
      
           await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); 
          } 
      }); 
      

      がまた待っている間に、このバージョンはmessageResultSetを収集することを可能にすることに注意してください:は実際には基本的に、より良い方法があり、前の1


    1分後に起こるはず次の繰り返し(例えば、Scanを使用して、前の結果セットオブジェクトを次の繰り返しに渡すことができると思いました)

  • 答えて

    1

    これまでCESは:署名付きScan機能があります:

    IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
        TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator) 
    

    しかし、あなたは、アキュムレータ機能が観測を返し

    IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
        TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator) 
    

    ...のようなものを必要とし、スキャン機能が自動的にそれを軽減します次の呼び出しに渡します。

    ここScanの貧乏人の機能の実装です:今、私たちはこの空想MyObservableScan演算子を持っていることを

    public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source, 
        TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator) 
    { 
        return source 
         .Publish(_source => _source 
          .Take(1) 
          .Select(s => accumulator(initialValue, s)) 
          .SelectMany(async o => (await o.LastOrDefaultAsync()) 
           .Let(m => _source 
            .MyObservableScan(m, accumulator) 
            .StartWith(m) 
           ) 
          ) 
          .Merge() 
         ); 
    } 
    
    //Wrapper to accommodate easy Task -> Observable transformations 
    public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source, 
        TAccumulate initialValue, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator) 
    { 
        return source.MyObservableScan(initialValue, (a, s) => Observable.FromAsync(() => accumulator(a, s))); 
    } 
    
    //Function to prevent re-evaluation in functional scenarios 
    public static U Let<T, U>(this T t, Func<T, U> selector) 
    { 
        return selector(t); 
    } 
    

    我々はそれを減らす機能を組み込むためのビットを変更することができ、ということを考えると
    public static IObservable<TAccumulate> MyScan<TSource, TAccumulate>(this IObservable<TSource> source, 
        TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator) 
    { 
        return source 
         .Publish(_source => _source 
          .Take(1) 
          .Select(s => accumulator(initialValue, s)) 
          .SelectMany(m => _source.MyScan(m, accumulator).StartWith(m)) 
         ); 
    } 
    

    比較的簡単に問題を解決することができます:

    var o = Observable.Interval(TimeSpan.FromMinutes(1)) 
        .MyObservableScan<long, ResultSet<string>>(null, (r, _) => Methods.GetNewResultsAsync<string>(r?.CurrentAsOfHandle)) 
    

    テストでは、アキュムレータTask/Observableの関数がソース内の間隔よりも長くかかると、observableが終了することに気付きました。なぜ私は分からない。誰かが訂正できれば、ずっと義務づけられている。

    0

    私はかなりのトリックを行うObservable.Generateに過負荷があることを発見しました。主な欠点は、asyncで動作しないことです。私は初期状態としてnullに渡し

    public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler);

    。私の条件として(無限に世論調査するために)x => trueを渡してください。 iterateの中では、渡された状態に基づいて実際のポーリングを行います。次にtimeSelectorにポーリング間隔を返します。

    ので:

    var resultSets = Observable.Generate<ResultSet<IMessage>, IEnumerable<IMessage>>(
        //initial (empty) result 
        new ResultSet<IMessage>(), 
    
        //poll endlessly (until unsubscription) 
        rs => true, 
    
        //each polling iteration 
        rs => 
        { 
         //get the version from the previous result (which could be that initial result) 
         int? previousVersion = rs.CurrentVersionHandle; 
    
         //here's the problem, though: it won't work with async methods :(
         MessageResultSet messageResultSet = ReadLatestMessagesAsync(currentVersion).Result; 
    
         return messageResultSet; 
        }, 
    
        //we only care about spitting out the messages in a result set 
        rs => rs.Messages, 
    
        //polling interval 
        TimeSpan.FromMinutes(1), 
    
        //which scheduler to run each iteration 
        TaskPoolScheduler.Default); 
    
    return resultSets 
        //project the list of messages into a sequence 
        .SelectMany(messageList => messageList); 
    
    +0

    もう一つの小さな欠点は、全体の結果セットの次のイテレーションを経て生き続けています。問題のバージョンでは、 'バージョン'だけが必要なので、 'メッセージ'部分をガベージコレクションすることができます。 –

    関連する問題