2011-10-30 12 views
1

リアクティブエクステンションを学んでいます。このようなタスクにマッチするかどうかを調べようとしています。リアクティブエクステンションでリクエストのバッチを処理する

私は、作業の単位として要求のバッチを処理し、すべての要求が完了したときにコールバックを呼び出すProcess()メソッドを持っています。

ここで重要なことは、実装ごとに各要求が同期または非同期のいずれかのコールバックを呼び出すことであり、バッチプロセッサは両方を処理できる必要があります。

バッチプロセッサからスレッドが開始されない場合、必要に応じて新しいスレッド(またはその他の非同期実行)が要求ハンドラ内から開始されます。これがrxのユースケースと一致するかどうかはわかりません。

public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted) 
{ 
    IUnitOfWork uow = null; 
    try 
    { 
     uow = unitOfWorkFactory.Create(); 

     var responses = new List<IResponse>(); 
     var outstandingRequests = requests.Count; 
     foreach (var request in requests) 
     { 
      var correlationId = request.CorrelationId; 
      Action<IResponse> requestCallback = response => 
      { 
       response.CorrelationId = correlationId; 
       responses.Add(response); 
       outstandingRequests--; 
       if (outstandingRequests != 0) 
        return; 

       uow.Commit(); 
       onCompleted(responses); 
      }; 

      requestProcessor.Process(request, requestCallback); 
     } 
    } 
    catch(Exception) 
    { 
     if (uow != null) 
      uow.Rollback(); 
    } 

    if (uow != null) 
     uow.Commit(); 
}   

どのようにあなたは、この使用してRXを実装します。このような

私の現在の作業コードルックス(ほぼ)?それは妥当ですか?

まだ返されていない非同期要求があっても、作業単位は同期してコミットされることに注意してください。あなたが同期または非同期の結果を返すことが自由だとして

答えて

3

これへの私のアプローチを:あなたはときに、すべてのIObservablesが完了知りたい場合は、最後に返され項目に観察可能で「n」の項目を向けるだろうと、集計は、これを行います2段階です。

public static class ObservableEx 
{ 
    public static Func<T, IObservable<R>> FromAsyncCallbackPattern<T, R>(
     this Action<T, Action<R>> call) 
    { 
     if (call == null) throw new ArgumentNullException("call"); 
     return t => 
     { 
      var subject = new AsyncSubject<R>(); 
      try 
      { 
       Action<R> callback = r => 
       { 
        subject.OnNext(r); 
        subject.OnCompleted(); 
       }; 
       call(t, callback); 
      } 
      catch (Exception ex) 
      { 
       return Observable.Throw<R>(ex, Scheduler.ThreadPool); 
      } 
      return subject.AsObservable<R>(); 
     }; 
    } 
} 

次に、IObservable<IResponse> Process(IObservable<IRequest> requests)にコールvoid Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)を回す:

まずAction<T, Action<R>>Func<T, IObservable<R>>になり、汎用演算作成

public IObservable<IResponse> Process(IObservable<IRequest> requests) 
{ 
    Func<IRequest, IObservable<IResponse>> rq2rp = 
     ObservableEx.FromAsyncCallbackPattern 
      <IRequest, IResponse>(requestProcessor.Process); 

    var query = (
     from rq in requests 
     select rq2rp(rq)).Concat(); 

    var uow = unitOfWorkFactory.Create(); 
    var subject = new ReplaySubject<IResponse>(); 

    query.Subscribe(
     r => subject.OnNext(r), 
     ex => 
     { 
      uow.Rollback(); 
      subject.OnError(ex); 
     }, 
     () => 
     { 
      uow.Commit(); 
      subject.OnCompleted(); 
     }); 

    return subject.AsObservable(); 
} 

を今だけでなく、これは処理の非同期を実行しませんしかし、結果の正しい順序も保証されます。

実際には、あなたがコレクションで始まるされているので、あなたもこれを行うことができます:

var rqs = requests.ToObservable(); 

var rqrps = rqs.Zip(Process(rqs), 
    (rq, rp) => new 
    { 
     Request = rq, 
     Response = rp, 
    }); 

は、その後、あなたは、観察を持っているだろうとまでペアCorrelationIdプロパティを必要とすることなく、各要求/応答。

こちらがお役に立てば幸いです。

+0

これは非常に、実際に役立ちます!非常に精巧な答えをありがとう。私がここで本当に頭を上げることができない唯一のことは、すべての要求が応答されたときにどうすれば "何か"をすることができるかということです。どのように私は言う:と今すべての要求が応答されると、これらの要求/応答のペアでこれらのメソッドを呼び出す...私は "rqrps" observableを購読しようとした、リストの応答を収集し、メソッドは完了しましたが、成功しませんでした(これ以降のリストで回答を収集するのはちょっと奇妙です)。私は本当にそれについてのいくつかのヒントを感謝しています:) – asgerhallas

+0

doh! ...間違った過負荷と呼ばれる:) – asgerhallas

+0

@asgerhallas - すべてのペアが戻ってくるのを待つ必要はありません。さもなければ '.ToArray()' observable演算子を見ると、 'IObservable 'が 'IObservable 'に変わります。n個の値を観測可能にして、あなたのニーズに合った完璧なオペレータのように聞こえます。 – Enigmativity

1

これは、受信の天才の一部です:

public IObservable<int> AddNumbers(int a, int b) { 
    return Observable.Return(a + b); 
} 

public IObservable<int> AddNumbersAsync(int a, int b) { 
    return Observable.Start(() => a + b, Scheduler.NewThread); 
} 

はどちらもIObservableタイプを持っているので、彼らは同じように動作します。

IObservable<int> listOfObservables[]; 

listObservables.ToObservable() 
    .Merge() 
    .Aggregate(0, (acc, x) => acc+1) 
    .Subscribe(x => Console.WriteLine("{0} items were run", x)); 
+0

非常に良い、ありがとう!今私は、コールバックを取得し、そのコールバックの呼び出しを観察されているものに変換する、現在のrequestProcessor.Process()を取る必要がある場合、最も簡単な方法は何ですか? – asgerhallas

+0

ああ...これを見つけたhttp://stackoverflow.com/questions/5827178/using-rx-framework-for-async-calls-using-the-void-asyncmethodactiont-callback正しい方向性のようです。 – asgerhallas