10

私はかなりの典型的なプロデューサ/コンシューマモデルをさまざまなタスクで実行します。これはTPL Dataflowの仕事ですか?

タスク1:バイナリファイルからバイト[]のバッチを読み取り、バイト配列の各コレクションについて新しいタスクを開始します。 (この操作はメモリ管理目的でバッチ処理されます)。

タスク2-n:これらはワーカー・タスクであり、それぞれバイト配列の渡されたコレクション(Tasks1から)で動作し、バイト配列を逆直列化し、特定の基準でソートした後、 (各バイト配列はそのようなオブジェクトにデシリアライズされます)。

タスク(n + 1)このタスクの仕事は、タスク1からどのように起きたのと同じ順序で並行ディクショナリに格納されているコレクションをマージするためです。私はcollectionID(これはint型であり、Task1内の新しいコレクションごとにインクリメントされます)をTask1からこのタスクに渡して渡すことで実現します。このタスクは基本的に、次の予想されるcollectionIDがすでにコンカレント・ディクショナリに格納されているかどうかをチェックし、そうであれば取り出して最終キューに追加し、コンカレント・ディクショナリ内の次のコレクションをチェックします。

ここで私が読んだことや私が見たビデオから、TPL Dataflowはそのようなプロデューサ/コンシューマモデルにとって最適な候補になるかもしれないと私は思っています。私は、TPL Dataflowを使ったことがないので、設計を考案できるようにはならないと思われます。スループットとレイテンシの点では、このライブラリはタスクまでですか?私は現在、250万バイトの配列を処理し、結果として得られるコレクションのオブジェクトを毎秒処理します。 TPLデータフローが簡素化するのに役立つか?私は特に、次の質問に対する答えに興味があります。TPLデータフローは、作業者タスクを生成し、作業を完了した後に再結合するとき、Task1の収集バッチの順序を保持できますか?それは物事を最適化しますか?全体の構造をプロファイリングすると、スピニングやあまりにも多くのコンカレントコレクションが原因でかなりの時間が浪費されているように感じます。

アイデア、考え方は?

答えて

12

EDIT:私は非常に間違っていました。 TransformBlockは、パラレル化のために構成されていても、同じ順序でアイテムを返します。そのため、元の回答のコードはまったく役に立たず、代わりにTransformBlockを使用することができます。


オリジナルの答え:AsOrdered()とPLINQ:私の知る限りは、.NETで唯一の並列性構築物は、彼らが入って来た順に処理アイテムを返すことができます知っているよう

。しかし、PLINQはあなたが望むものに合っていないようです。

TPL Dataflowは、一方でよく適合しますが、並列処理をサポートし、同時にアイテムを返すブロックはありません(TransformBlockは両方をサポートしていますが、同時)。幸運なことに、データフローブロックは合成可能性を念頭に設計されているので、それを行う独自のブロックを構築できます。

しかし、まず、結果をどのように整理するかを理解する必要があります。あなたが示唆したように、いくつかの同期メカニズムと並行して辞書を使用することは確かに有効です。しかし、より単純な解決法があると思います。Taskのキューを使用してください。出力タスクでは、Taskをデキューし、(非同期に)完了するのを待ちます。非同期に完了すると、結果が送信されます。キューが空の場合にはまだ同期が必要ですが、賢明に使用するキューを選択すると空きが得られます。

したがって、一般的な考えは次のようなものです:私たちが書いているのは、入力と出力があるIPropagatorBlockです。カスタムIPropagatorBlockを作成する最も簡単な方法は、入力を処理する1つのブロックと、結果を生成し、DataflowBlock.Encapsulate()を使用して1つのブロックとして扱う別のブロックを作成することです。

入力ブロックは正しい順序で入力項目を処理する必要があるため、そこで並列化は行われません。新しいTask(実際には、Taskの結果を後で設定できるように、TaskCompletionSource)を作成し、それをキューに追加して処理のためにアイテムを送信し、正しい結果を設定する何らかの方法を使用します。Task 。このブロックを何かにリンクする必要はないので、ActionBlockを使用できます。

出力ブロックは、キューからTask秒かかることがあり、非同期にそれらを待ってからそれらを送信する必要があります。しかし、すべてのブロックにはキューが埋め込まれており、デリゲートを取るブロックには非同期待機が組み込まれているため、これは非常に簡単です:new TransformBlock<Task<TOutput>, TOutput>(t => t)。このブロックは、キューとしても出力ブロックとしても機能します。このため、私たちはどんな同期にも対処する必要はありません。

パズルの最後の部分は、実際には並行して項目を処理しています。このために、別のActionBlockを使用することができます。今回はMaxDegreeOfParallelismと設定します。それは入力を受け取り、それを処理し、正しいTaskの結果を待ち行列に入れるでしょう。

が一緒に入れて、それは次のようになります。

public static IPropagatorBlock<TInput, TOutput> 
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform) 
{ 
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t); 

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
     tuple => tuple.Item2(transform(tuple.Item1)), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    var enqueuer = new ActionBlock<TInput>(
     async item => 
     { 
      var tcs = new TaskCompletionSource<TOutput>(); 
      await processor.SendAsync(
       new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult)); 
      await queue.SendAsync(tcs.Task); 
     }); 

    enqueuer.Completion.ContinueWith(
     _ => 
     { 
      queue.Complete(); 
      processor.Complete(); 
     }); 

    return DataflowBlock.Encapsulate(enqueuer, queue); 
} 

をそんなに話した後、それはコードのかなり少量だ、私は思います。

パフォーマンスが気になるので、このコードを微調整する必要があります。たとえば、オーバーサブスクリプションを避けるために、のブロックのMaxDegreeOfParallelismEnvironment.ProcessorCountのように設定すると意味があります。また、待ち時間がスループットよりも重要である場合は、同じブロックのMaxMessagesPerTaskを1(または別の小さい番号)に設定して、アイテムの処理が終了した時点でただちに出力に送信することが理にかなっています。

また、着信アイテムを調整する場合は、BoundedCapacityenqueuerと設定できます。

+0

私は最初に消化して試してみたいと思っています。それらのためにたくさんありがとう、それは少なくともupvoteに値します;-)私はそれらのアイデアと遊ぶことができ、私は戻ってきます。キューイングタスクは非常に意味があり、なぜ私はそれを早く取得しなかったのだろうかと思います。 –

+0

ok私はあなたのポストを通過し、TPL Dataflowで読んで、あなたの提案されたソリューションを完全に理解するための質問をいくつかここで組み合わせます:(1)Transformblock が既に存在する場合、カスタムIPropagatorBlockとIDataflowBlock.Encapsulate (2)ブロックのリンクを実際にどのように計画しているのか分からない。あなたは最初にTransformBlocksのActionBlocksを話します。私が読んだところでは、ActionBlockはアーキテクチャ全体の「終点」ではないでしょうか? –

+1

1.これは2番目の段落で説明しました: 'TransformBlock'は、項目を並行して処理して、同時にそれらを返すことができません。いずれか一方を行うことはできますが、両方を行うことはできません。 – svick

関連する問題