2012-01-01 8 views
3

複数の長時間実行する操作を並行して実行する必要があり、何らかの方法で進捗状況を報告したいと考えています。私の最初の研究から、IObservableはこのモデルに適合しているようです。アイデアは、intのIObservableを返すメソッドを呼び出すことです。intが報告されている割合は完全であり、メソッドを終了するとすぐに並列実行が開始されます。この観察可能なビューは、すべてのサブスクライバが特定の時点、例えば遅れた加入者は、実行全体が完了し、追跡するための進行がなくなったことを知ることしかできない。.Net RX:並列実行の進行状況を追跡

Observable.ForkJoinとObservable.Startを使用するのがこの問題に最も近いアプローチですが、メソッドから返すことができる単一のオブザーバブルにする方法を理解することはできません。

どうすれば実現できるのか、あるいは.Net RXを使用してこの問題を解決する別の方法があるかもしれません。ここで

答えて

3

で、私はおそらく、戻り値と操作が進捗状況を報告する方法としてBehaviorSubjectを用いる方法で開始します。例だけが必要な場合は、最後までスキップしてください。この回答の残りの部分は手順を説明します。

この回答のために、長時間実行される操作には、非同期に呼び出す独自の方法がないと仮定します。もしそうなら、次のステップは少し異なるかもしれません。次の作業は、ISchedulerを使って別のスレッドに作業を送ることです。呼び出し側は、必要に応じてスケジューラをパラメータとして使用するオーバーロードを行うことで、呼び出し元に作業の場所を選択させることができます(デフォルトスケジューラを選択しないオーバーロード)。 IScheduler.Schedulerのいくつかのオーバーロードがあります。そのうちのいくつかは拡張メソッドなので、あなたの状況に最も適しているものを調べる必要があります。私はここでActionしかかかりません。複数の操作がすべて並列に実行できる場合は、scheduler.Scheduleを複数回呼び出すことができます。

これの最も難しい部分は、おそらくどの時点でどのような進捗状況があるかを判断することです。一度に複数の操作を行っている場合は、現在進行状況を知るために完了した操作の数を把握する必要があります。あなたが提供した情報で、私はそれ以上は具体的ではありません。

最後に、操作がキャンセル可能な場合は、CancellationTokenをパラメータとして使用することをおすすめします。これを使用して、操作が開始される前にスケジューラーのキューに入っている間に操作を取り消すことができます。操作コードを正しく記述すると、トークンをキャンセル用に使用することもできます。

IObservable<int> DoStuff(/*args*/, 
         CancellationToken cancel, 
         IScheduler scheduler) 
{ 
    BehaviorSubject<int> progress; 
    //if you don't take it as a parameter, pick a scheduler 
    //IScheduler scheduler = Scheduler.ThreadPool; 

    var disp = scheduler.Schedule(() => 
    { 
     //do stuff that needs to run on another thread 

     //report progres 
     porgress.OnNext(25); 
    }); 
    var disp2 = scheduler.Schedule(...); 

    //if the operation is cancelled before the scheduler has started it, 
    //you need to dispose the return from the Schedule calls 
    var allOps = new CompositeDisposable(disp, disp2); 
    cancel.Register(allOps.Dispose); 

    return progress; 
} 
+0

これは本当に良いサンプルと説明です。ありがとうございました。私はもう一つ質問があります。長時間実行されているオペレーションが複数のグループに分割され、グループ内でオペレーションを並行して実行する必要がある場合はどうなりますか?この場合、スケジューラーの代わりにContinueWithを介してこの種の連鎖を許可するタスク・ライブラリーを使用するべきですが、BehaviorSubjectの使用は保持しますか? – andriys

+1

@andriysこれは1つのオプションです。別のオプションは、各グループをIObservableを返す別の関数に分割することです。これらの関数は、 'Observable.Concat'を介して連鎖することができます。しかし、チェーンの中で最初に観察できるもの以外のすべてを冷たくしたいと思うでしょう。 'Observable。Deferは、あなたが熱い観察可能なものを冷たいものに変換することを可能にする。 –

+0

それは素晴らしいです、指導に感謝! – andriys

0

が熱い観測可能にするための一つのアプローチ

// setup a method to do some work, 
// and report it's own partial progress 
Func<string, IObservable<int>> doPartialWork = 
    (arg) => Observable.Create<int>(obsvr => { 
     return Scheduler.TaskPool.Schedule(arg,(sched,state) => { 
      var progress = 0; 
      var cancel = new BooleanDisposable(); 
      while(progress < 10 && !cancel.IsDisposed) 
      { 
       // do work with arg 
       Thread.Sleep(550); 
       obsvr.OnNext(1); //report progress 
       progress++; 
      } 
      obsvr.OnCompleted(); 
      return cancel; 
     }); 
    }); 

var myArgs = new[]{"Arg1", "Arg2", "Arg3"}; 

// run all the partial bits of work 
// use SelectMany to get a flat stream of 
// partial progress notifications 
var xsOfPartialProgress = 
     myArgs.ToObservable(Scheduler.NewThread) 
       .SelectMany(arg => doPartialWork(arg)) 
        .Replay().RefCount(); 

// use Scan to get a running aggreggation of progress 
var xsProgress = xsOfPartialProgress 
        .Scan(0d, (prog,nextPartial) 
          => prog + (nextPartial/(myArgs.Length*10d))); 
+0

この関数は、コールドオブザーバブルを作成します。この作業は 'doPartialWork'への最初の呼び出しではなく、返されるobservableへのすべての*サブスクリプションで始まります。 –

+0

@ギデオン良い点 - 修正された答え –

+0

より良いが、まだ寒い。 –

関連する問題