EDIT:私は非常に間違っていました。 TransformBlock
は、パラレル化のために構成されていても、同じ順序でアイテムを返します。そのため、元の回答のコードはまったく役に立たず、代わりにTransformBlock
を使用することができます。
オリジナルの答え:AsOrdered()
とPLINQ:私の知る限りは、.NETで唯一の並列性構築物は、彼らが入って来た順に処理アイテムを返すことができます知っているよう
。しかし、PLINQはあなたが望むものに合っていないようです。
TPL Dataflowは、一方でよく適合しますが、並列処理をサポートし、同時にアイテムを返すブロックはありません(TransformBlock
は両方をサポートしていますが、同時)。幸運なことに、データフローブロックは合成可能性を念頭に設計されているので、それを行う独自のブロックを構築できます。
しかし、まず、結果をどのように整理するかを理解する必要があります。あなたが示唆したように、いくつかの同期メカニズムと並行して辞書を使用することは確かに有効です。しかし、より単純な解決法があると思います。Task
のキューを使用してください。出力タスクでは、Task
をデキューし、(非同期に)完了するのを待ちます。非同期に完了すると、結果が送信されます。キューが空の場合にはまだ同期が必要ですが、賢明に使用するキューを選択すると空きが得られます。
したがって、一般的な考えは次のようなものです:私たちが書いているのは、入力と出力があるIPropagatorBlock
です。カスタムIPropagatorBlock
を作成する最も簡単な方法は、入力を処理する1つのブロックと、結果を生成し、DataflowBlock.Encapsulate()
を使用して1つのブロックとして扱う別のブロックを作成することです。
入力ブロックは正しい順序で入力項目を処理する必要があるため、そこで並列化は行われません。新しいTask
(実際には、Task
の結果を後で設定できるように、TaskCompletionSource
)を作成し、それをキューに追加して処理のためにアイテムを送信し、正しい結果を設定する何らかの方法を使用します。Task
。このブロックを何かにリンクする必要はないので、ActionBlock
を使用できます。
出力ブロックは、キューからTask
秒かかることがあり、非同期にそれらを待ってからそれらを送信する必要があります。しかし、すべてのブロックにはキューが埋め込まれており、デリゲートを取るブロックには非同期待機が組み込まれているため、これは非常に簡単です:new TransformBlock<Task<TOutput>, TOutput>(t => t)
。このブロックは、キューとしても出力ブロックとしても機能します。このため、私たちはどんな同期にも対処する必要はありません。
パズルの最後の部分は、実際には並行して項目を処理しています。このために、別のActionBlock
を使用することができます。今回はMaxDegreeOfParallelism
と設定します。それは入力を受け取り、それを処理し、正しいTask
の結果を待ち行列に入れるでしょう。
が一緒に入れて、それは次のようになります。
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
をそんなに話した後、それはコードのかなり少量だ、私は思います。
パフォーマンスが気になるので、このコードを微調整する必要があります。たとえば、オーバーサブスクリプションを避けるために、のブロックのMaxDegreeOfParallelism
をEnvironment.ProcessorCount
のように設定すると意味があります。また、待ち時間がスループットよりも重要である場合は、同じブロックのMaxMessagesPerTask
を1(または別の小さい番号)に設定して、アイテムの処理が終了した時点でただちに出力に送信することが理にかなっています。
また、着信アイテムを調整する場合は、BoundedCapacity
をenqueuer
と設定できます。
私は最初に消化して試してみたいと思っています。それらのためにたくさんありがとう、それは少なくともupvoteに値します;-)私はそれらのアイデアと遊ぶことができ、私は戻ってきます。キューイングタスクは非常に意味があり、なぜ私はそれを早く取得しなかったのだろうかと思います。 –
ok私はあなたのポストを通過し、TPL Dataflowで読んで、あなたの提案されたソリューションを完全に理解するための質問をいくつかここで組み合わせます:(1)Transformblockが既に存在する場合、カスタムIPropagatorBlockとIDataflowBlock.Encapsulate (2)ブロックのリンクを実際にどのように計画しているのか分からない。あなたは最初にTransformBlocksのActionBlocksを話します。私が読んだところでは、ActionBlockはアーキテクチャ全体の「終点」ではないでしょうか? –
1.これは2番目の段落で説明しました: 'TransformBlock'は、項目を並行して処理して、同時にそれらを返すことができません。いずれか一方を行うことはできますが、両方を行うことはできません。 – svick