Rxを使用して非同期ワークフローを実装しようとしていますが、私は完全に間違っているようです。私がやりたい何.net Rx:メッセージの順序付き一括処理
はこれです:
From an undefined asynchronous stream of un-parsed message strings (i.e. an IObservable<string>)
parse the message strings asynchronously, but preserve their order. (IObservable<Message>)
Batch up parsed Messages in groups of 100 or so (IObservable<IEnumerable<Message>>)
Send each batch, when complete, to the UI thread to be processed. Batches must arrive in the same order they were started.
私がオーダー保存を得るように見えることができない、と私はそれらをに期待するときにもRxは非同期的に物事をやっているとは思われません。
IObservableの代わりにIEnumerableを使用し、.AsParallel()。AsOrdered()演算子を呼び出して注文の保存を試みました。ここにコードがあります。あなたが持って考えると
private IObservable<IEnumerable<Message>> messageSource;
public IObservable<IEnumerable<Message>> MessageSource { get { return messageSource; } }
/// <summary>
/// Sub-classes of MessageProviderBase provide this IEnumerable to
/// generate unparsed message strings synchronously
/// </summary>
protected abstract IEnumerable<string> UnparsedMessages { get; }
public MessageProviderBase()
{
// individual parsed messages as a PLINQ query
var parsedMessages = from unparsedMessage in UnparsedMessages.AsParallel().AsOrdered()
select ParseMessage(unparsedMessage);
// convert the above PLINQ query to an observable, buffering up to 100 messages at a time
var batchedMessages
= parsedMessages.ToObservable().BufferWithTimeOrCount(TimeSpan.FromMilliseconds(200), 100);
// ISSUE #1:
// batchedMessages seems to call OnNext before all of the messages in its buffer are parsed.
// If you convert the IObservable<Message> it generates to an enumerable, it blocks
// when you try to enumerate it.
// Convert each batch to an IEnumerable
// ISSUE #2: Even if the following Rx query were to run asynchronously (it doesn't now, see the above comment),
// it could still deliver messages out of order. Only, instead of delivering individual
// messages out of order, the message batches themselves could arrive out of order.
messageSource = from messageBatch in batchedMessages
select messageBatch.ToEnumerable().ToList();
}
コンパイルが完了した後でも(Tupleは不変で、そこにはスカラがあるように見えます)、返されたIObservableは結果を提供しません。私はそれが自分のスレッドでデッドロックしていると信じています。 –
@Jeremy - あなたは正しいですが、コンパイルエラーは全然残念です。私はそれを修正し、カスタムクラスの 'Tuple'依存関係を取り替えました。しかし、デッドロックを再現することはできませんでした。そのため、タスクの長さをランダム化するサンプルの使用方法を示しました。また、Scheduler.Immediateを使用してテストして、タスクがすぐに完了したときに動作することを確認しました。 –
このデッドロックは、以前のワークフローでBufferWithTimeOrCount演算子を誤って使用していたようです。 –