2012-04-24 4 views
18

C#4.0の悲しい頃、IEnumerableの "yield return"継続をハッキングしてオブザーバブルを待つことで、GUIスレッドで非同期ワークフローを可能にする次の "WorkflowExecutor"クラスを作成しました。したがって、次のコードは、button1Clickでテキストを更新する単純なワークフローを開始し、button2をクリックするのを待ち、1秒後にループします。非同期の仕事をするために「利回り」継続を使ってobservableで待っています

public sealed partial class Form1 : Form { 
    readonly Subject<Unit> _button2Subject = new Subject<Unit>(); 
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor(); 

    public Form1() { 
     InitializeComponent(); 
    } 

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() { 
     Text = "Initializing"; 
     var scheduler = new ControlScheduler(this); 
     while (true) { 
      yield return scheduler.WaitTimer(1000); 
      Text = "Waiting for Click"; 
      yield return _button2Subject; 
      Text = "Click Detected!"; 
      yield return scheduler.WaitTimer(1000); 
      Text = "Restarting"; 
     } 
    } 

    void button1_Click(object sender, EventArgs e) { 
     _workflowExecutor.Run(CreateAsyncHandler()); 
    } 

    void button2_Click(object sender, EventArgs e) { 
     _button2Subject.OnNext(Unit.Default); 
    } 

    void button3_Click(object sender, EventArgs e) { 
     _workflowExecutor.Stop(); 
    } 
} 

public static class TimerHelper { 
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) { 
     return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default); 
    } 
} 

public sealed class WorkflowExecutor { 
    IEnumerator<IObservable<Unit>> _observables; 
    IDisposable _subscription; 

    public void Run(IEnumerable<IObservable<Unit>> actions) { 
     _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator(); 
     Continue(); 
    } 

    void Continue() { 
     if (_subscription != null) { 
      _subscription.Dispose(); 
     } 
     if (_observables.MoveNext()) { 
      _subscription = _observables.Current.Subscribe(_ => Continue()); 
     } 
    } 

    public void Stop() { 
     Run(null); 
    } 
} 

アイデアのスマートな部分、ダニエルエリカーのAsyncIOPipeのアイデアから撮影された:http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/、その後、私はそれの上に反応性のフレームワークを追加しました。

これでC#5.0の非同期機能を使用してこれを書き直すのに問題がありましたが、それは簡単なことです。 Observableをタスクに変換すると、それらは1回だけ実行され、whileループは2回目にクラッシュします。どのような助けを借りても大丈夫でしょう。

WorkflowExecutorは非同期/待機メカニズムによって私に与えられていることは何ですか? asyncでできることは何ですか?私はWorkflowExecutorで(同じような量のコードを与えて)できません。

+0

あなたはTask's 'への変換をどのように正確に行うのですか?見た目はどうやってクラッシュするのですか? – svick

+1

そして 'await'はこの種の非同期よりも多くの利点がありますが、大きな違いの1つは待ち時間から戻ることです。例えば。 'string s = client.DownloadStringAsync(url);を待ちます。 – svick

答えて

24

あなたが気づいたように、タスクは、Observableの「イベントの流れ」とは対照的に、1回限りの使用です。この考え方(私見)の良い方法はRx team's post about 2.0 Beta上の2x2のチャートです:

2x2 chart for task vs observable

状況(イベントの「流れ」対一時間)に応じて、観察可能でより多くの意味を作るかもしれません保ちます。

Reactive 2.0 Betaまでホップすることができれば、それを使って観測可能なものを待つことができます。例えば、あなたのコードの「/を待つ非同期」での私自身の試み(概算)バージョンは、次のようになります。

public sealed partial class Form1 : Form 
{ 
    readonly Subject<Unit> _button2Subject = new Subject<Unit>(); 

    private bool shouldRun = false; 

    public Form1() 
    { 
     InitializeComponent(); 
    } 

    async Task CreateAsyncHandler() 
    { 
     Text = "Initializing"; 
     while (shouldRun) 
     { 
      await Task.Delay(1000); 
      Text = "Waiting for Click"; 
      await _button2Subject.FirstAsync(); 
      Text = "Click Detected!"; 
      await Task.Delay(1000); 
      Text = "Restarting"; 
     } 
    } 

    async void button1_Click(object sender, EventArgs e) 
    { 
     shouldRun = true; 
     await CreateAsyncHandler(); 
    } 

    void button2_Click(object sender, EventArgs e) 
    { 
     _button2Subject.OnNext(Unit.Default); 
    } 

    void button3_Click(object sender, EventArgs e) 
    { 
     shouldRun = false; 
    } 
} 
+0

'Task'は1回限りの使用ですが、' Task's以外のものを '待つ 'ことができます。ですから、 'IObservable '全体を表すことができる待ち時間を作成することは可能です。 – svick

+0

これは私がコードサンプルで行ったことです。 Rx 2.0では、オブザーバブルを待つことができます。デフォルト動作では、Observableの最後の要素が返されます。なぜFirstAsyncであるのですか? –

22

ジェームズが言及したように、あなたはRxのv2.0のベータ版で始まるIObservable <T>シーケンスを待つことができます。この動作は、最後の要素(OnCompletedの前)を返すか、または観察されたOnErrorをスローすることです。シーケンスに要素が含まれていない場合は、InvalidOperationExceptionが送出されます。

これを使用して注意してください、あなたが得ることができるすべての他の所望の動作:

  • はxs.FirstAsync()
  • 単一の値のみがxs.SingleAsyncを待っていることでありますことを確認する(待っていることにより、最初の要素を取得します)
  • あなたは空の配列と罰金たら、すべての要素を取得するにはxs.DefaultIfEmpty()
  • を待つ、(xs.ToArrayを待つ)または(xs.ToListを待つ)

あなたは集計の結果を計算するように、さらに多くの派手なことを行うが、DOおよびスキャンを使用して中間値を観察することができます。

var xs = Observable.Range(0, 10, Scheduler.Default); 

var res = xs.Scan((x, y) => x + y) 
      .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); }); 

Console.WriteLine("Done! The sum is {0}", await res); 
+1

これは、IObservable が正常に構築されたことを待っている最近のプロジェクトで驚いた後に探していた情報です。共有してくれてありがとう。 – jpierson

関連する問題