2011-02-01 10 views
3

Rxを使用して非同期ワークフローを実装しようとしていますが、私は完全に間違っているようです。私がやりたい何.net Rx:メッセージの順序付き一括処理

はこれです:

From an undefined asynchronous stream of un-parsed message strings (i.e. an IObservable<string>) 
parse the message strings asynchronously, but preserve their order. (IObservable<Message>) 
Batch up parsed Messages in groups of 100 or so (IObservable<IEnumerable<Message>>) 
Send each batch, when complete, to the UI thread to be processed. Batches must arrive in the same order they were started. 

私がオーダー保存を得るように見えることができない、と私はそれらをに期待するときにもRxは非同期的に物事をやっているとは思われません。

IObservableの代わりにIEnumerableを使用し、.AsParallel()。AsOrdered()演算子を呼び出して注文の保存を試みました。ここにコードがあります。あなたが持って考えると

private IObservable<IEnumerable<Message>> messageSource; 
    public IObservable<IEnumerable<Message>> MessageSource { get { return messageSource; } } 

    /// <summary> 
    /// Sub-classes of MessageProviderBase provide this IEnumerable to 
    /// generate unparsed message strings synchronously 
    /// </summary> 
    protected abstract IEnumerable<string> UnparsedMessages { get; } 

    public MessageProviderBase() 
    { 
     // individual parsed messages as a PLINQ query 
     var parsedMessages = from unparsedMessage in UnparsedMessages.AsParallel().AsOrdered() 
               select ParseMessage(unparsedMessage); 

     // convert the above PLINQ query to an observable, buffering up to 100 messages at a time 
     var batchedMessages 
      = parsedMessages.ToObservable().BufferWithTimeOrCount(TimeSpan.FromMilliseconds(200), 100); 

     // ISSUE #1: 
     // batchedMessages seems to call OnNext before all of the messages in its buffer are parsed. 
     // If you convert the IObservable<Message> it generates to an enumerable, it blocks 
     // when you try to enumerate it. 

     // Convert each batch to an IEnumerable 
     // ISSUE #2: Even if the following Rx query were to run asynchronously (it doesn't now, see the above comment), 
     // it could still deliver messages out of order. Only, instead of delivering individual 
     // messages out of order, the message batches themselves could arrive out of order. 
     messageSource = from messageBatch in batchedMessages 
             select messageBatch.ToEnumerable().ToList(); 
    } 

答えて

3

私の答えは、Enigmativityのコードに基づいていますが、完了に関連するいくつかの競合条件を修正し、キャンセルとカスタムスケジューラのサポートも追加しました。ここで

public static IObservable<U> Fork<T, U>(this IObservable<T> source, 
    Func<T, U> selector) 
{ 
    return source.Fork<T, U>(selector, Scheduler.TaskPool); 
} 

public static IObservable<U> Fork<T, U>(this IObservable<T> source, 
    Func<T, U> selector, IScheduler scheduler) 
{ 
    return Observable.CreateWithDisposable<U>(observer => 
    { 
     var runningTasks = new CompositeDisposable(); 

     var lockGate = new object(); 
     var queue = new Queue<ForkTask<U>>(); 
     var completing = false; 
     var subscription = new MutableDisposable(); 

     Action<Exception> onError = ex => 
     { 
      lock(lockGate) 
      { 
       queue.Clear(); 
       observer.OnError(ex); 
      } 
     }; 

     Action dequeue =() => 
     { 
      lock (lockGate) 
      { 
       var error = false; 
       while (queue.Count > 0 && queue.Peek().Completed) 
       { 
        var task = queue.Dequeue(); 
        observer.OnNext(task.Value); 
       } 
       if (completing && queue.Count == 0) 
       { 
        observer.OnCompleted(); 
       } 
      } 
     }; 

     Action onCompleted =() => 
     { 
      lock (lockGate) 
      { 
       completing = true; 
       dequeue(); 
      } 
     }; 

     Action<T> enqueue = t => 
     { 
      var cancellation = new MutableDisposable(); 
      var task = new ForkTask<U>(); 

      lock(lockGate) 
      { 
       runningTasks.Add(cancellation); 
       queue.Enqueue(task); 
      } 

      cancellation.Disposable = scheduler.Schedule(() => 
      { 
       try 
       { 
        task.Value = selector(t); 

        lock(lockGate) 
        { 
         task.Completed = true; 
         runningTasks.Remove(cancellation); 
         dequeue(); 
        } 
       } 
       catch(Exception ex) 
       { 
        onError(ex); 
       } 
      }); 
     }; 

     return new CompositeDisposable(runningTasks, 
      source.AsObservable().Subscribe(
       t => { enqueue(t); }, 
       x => { onError(x); }, 
       () => { onCompleted(); } 
      )); 
    }); 
} 

private class ForkTask<T> 
{ 
    public T Value = default(T); 
    public bool Completed = false; 
} 

はそれをテストするためのタスクの実行時間をランダム化サンプルです:

AutoResetEvent are = new AutoResetEvent(false); 

Random rand = new Random(); 

Observable.Range(0, 5) 
    .Fork(i => 
    { 
     int delay = rand.Next(50, 500); 
     Thread.Sleep(delay); 

     return i + 1; 
    }) 
    .Subscribe(
     i => Console.WriteLine(i), 
     () => are.Set() 
    ); 

are.WaitOne(); 

Console.ReadLine(); 
+0

コンパイルが完了した後でも(Tupleは不変で、そこにはスカラがあるように見えます)、返されたIObservableは結果を提供しません。私はそれが自分のスレッドでデッドロックしていると信じています。 –

+0

@Jeremy - あなたは正しいですが、コンパイルエラーは全然残念です。私はそれを修正し、カスタムクラスの 'Tuple'依存関係を取り替えました。しかし、デッドロックを再現することはできませんでした。そのため、タスクの長さをランダム化するサンプルの使用方法を示しました。また、Scheduler.Immediateを使用してテストして、タスクがすぐに完了したときに動作することを確認しました。 –

+0

このデッドロックは、以前のワークフローでBufferWithTimeOrCount演算子を誤って使用していたようです。 –

2

::私がいる問題については、以下の注意事項を参照してください

IObservable<string> UnparsedMessages = ...; 
Func<string, Message> ParseMessage = ...; 

次にあなたがそうのようなSelectAsync拡張メソッドを使用することができます。

IObservable<Message> ParsedMessages = UnparsedMessages.SelectAsync(ParseMessage); 

SelectAsync拡張子メソッドは解析されていない各メッセージを非同期に処理し、結果が到着した順番に戻るようにします。

これが必要なのかどうか教えてください。ここで

コードです:私は仕事のためにこれを再訪する必要が終わったと、このコードのより堅牢なバージョンを書いた

public static IObservable<U> SelectAsync<T, U>(this IObservable<T> source, 
    Func<T, U> selector) 
{ 
    var subject = new Subject<U>(); 
    var queue = new Queue<System.Threading.Tasks.Task<U>>(); 
    var completing = false; 
    var subscription = (IDisposable)null; 

    Action<Exception> onError = ex => 
    { 
     queue.Clear(); 
     subject.OnError(ex); 
     subscription.Dispose(); 
    }; 

    Action dequeue =() => 
    { 
     lock (queue) 
     { 
      var error = false; 
      while (queue.Count > 0 && queue.Peek().IsCompleted) 
      { 
       var task = queue.Dequeue(); 
       if (task.Exception != null) 
       { 
        error = true; 
        onError(task.Exception); 
        break; 
       } 
       else 
       { 
        subject.OnNext(task.Result); 
       } 
      } 
      if (!error && completing && queue.Count == 0) 
      { 
       subject.OnCompleted(); 
       subscription.Dispose(); 
      } 
     } 
    }; 

    Action<T> enqueue = t => 
    { 
     if (!completing) 
     { 
      var task = new System.Threading.Tasks.Task<U>(() => selector(t)); 
      queue.Enqueue(task); 
      task.ContinueWith(tu => dequeue()); 
      task.Start(); 
     } 
    }; 

    subscription = source.Subscribe(
     t => { lock(queue) enqueue(t); }, 
     x => { lock(queue) onError(x); }, 
     () => { lock(queue) completing = true; }); 

    return subject.AsObservable(); 
} 

(リチャードの答えにも基づく。)

キーこのコードの利点は、明示的なキューが存在しないことです。私は純粋にタスクの継続を使用して結果を順番に戻しています。治療のように動作します! LINQを使用することができるようにする

public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, U> selector) 
{ 
    return source.ForkSelect<T, U>(t => Task<U>.Factory.StartNew(() => selector(t))); 
} 

public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, Task<U>> selector) 
{ 
    if (source == null) throw new ArgumentNullException("source"); 
    if (selector == null) throw new ArgumentNullException("selector"); 
    return Observable.CreateWithDisposable<U>(observer => 
    { 
     var gate = new object(); 
     var onNextTask = Task.Factory.StartNew(() => { }); 
     var sourceCompleted = false; 
     var taskErrored = false; 

     Action<Exception> onError = ex => 
     { 
      sourceCompleted = true; 
      onNextTask = onNextTask.ContinueWith(t => observer.OnError(ex)); 
     }; 

     Action onCompleted =() => 
     { 
      sourceCompleted = true; 
      onNextTask = onNextTask.ContinueWith(t => observer.OnCompleted()); 
     }; 

     Action<T> onNext = t => 
     { 
      var task = selector(t); 
      onNextTask = Task.Factory.ContinueWhenAll(new[] { onNextTask, task }, ts => 
      { 
       if (!taskErrored) 
       { 
        if (task.IsFaulted) 
        { 
         taskErrored = true; 
         observer.OnError(task.Exception); 
        } 
        else 
        { 
         observer.OnNext(task.Result); 
        } 
       } 
      }); 
     }; 

     var subscription = source 
      .AsObservable() 
      .Subscribe(
       t => { if (!sourceCompleted) lock (gate) onNext(t); }, 
       ex => { if (!sourceCompleted) lock (gate) onError(ex); }, 
       () => { if (!sourceCompleted) lock (gate) onCompleted(); }); 

     var @return = new CompositeDisposable(subscription); 

     return @return; 
    }); 
} 

そしてSelectManyのオーバーロードは、以下のとおりです。

public static IObservable<U> SelectMany<T, U>(this IObservable<T> source, Func<T, Task<U>> selector) 
{ 
    return source.ForkSelect<T, U>(selector); 
} 

public static IObservable<V> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Task<U>> taskSelector, Func<T, U, V> resultSelector) 
{ 
    if (source == null) throw new ArgumentNullException("source"); 
    if (taskSelector == null) throw new ArgumentNullException("taskSelector"); 
    if (resultSelector == null) throw new ArgumentNullException("resultSelector"); 
    return source.Zip(source.ForkSelect<T, U>(taskSelector), (t, u) => resultSelector(t, u)); 
} 

したがって、これらの方法は次のように使用することができます:

var observableOfU = observableOfT.ForkSelect(funcOfT2U); 

または:

var observableOfU = observableOfT.ForkSelect(funcOfT2TaskOfU); 

または

var observableOfU = 
    from t in observableOfT 
    from u in funcOfT2TaskOfU(t) 
    select u; 

お楽しみください!

+1

は良いですね!競合状態を避けるために、Observable.CreateWithDisposableにコードをラップして件名を削除することをお勧めします。また、 'subscription'を返すこともできます。自動的に' OnCompleted/OnError'に配置されるので、そのコードも削除できます。また、メソッドの順序保護(onErrorの後の​​onCompletedなど)をすべて控えて、 'source.AsObservable()'を購読することができます。 –

+1

さらに、ローカル変数にサブスクリプションを割り当ててオブザーバからその変数を使用する場合は、 'MutableDisposable'を使用して、' NullReferenceException'という結果となる競合状態を防ぐことをお勧めします。 –

+1

最後に(if .NET 4を使用している場合は、 'Disposable.Create'を介して' CancellationTokenSource'をサポートすることができます(そして、 'CompositeDisposable'を使ってソースサブスクリプションと結合して)、エラーがあれば他のタスクを取り消すことができます。 –

関連する問題