2012-11-22 4 views
20

両方の変換ブロックが完了したときにコードが完了するコードを再書き込みするにはどうすればよいですか?完了とは、完了とマークされ、「アウトキュー」が空であることを意味します。TPLデータフロー、すべてのソースデータブロックが完了したときのみ完了を保証

public Test() 
    { 
     broadCastBlock = new BroadcastBlock<int>(i => 
      { 
       return i; 
      }); 

     transformBlock1 = new TransformBlock<int, string>(i => 
      { 
       Console.WriteLine("1 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(50); 
       return ("1_" + i); 
      }); 

     transformBlock2 = new TransformBlock<int, string>(i => 
      { 
       Console.WriteLine("2 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(20); 
       return ("2_" + i); 
      }); 

     processorBlock = new ActionBlock<string>(i => 
      { 
       Console.WriteLine(i); 
      }); 

     //Linking 
     broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); 
     broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    } 

    public void Start() 
    { 
     const int numElements = 100; 

     for (int i = 1; i <= numElements; i++) 
     { 
      broadCastBlock.SendAsync(i); 
     } 

     //mark completion 
     broadCastBlock.Complete(); 

     processorBlock.Completion.Wait(); 

     Console.WriteLine("Finished"); 
     Console.ReadLine(); 
    } 
} 

Iは、各ブロックを変換するための入力バッファカウントを追加して、コードを編集しました。明らかに、100のすべての項目が各変換ブロックにストリーミングされます。しかし、変換ブロックの1つが終了すると、プロセッサブロックはこれ以上アイテムを受け入れず、代わりに不完全な変換ブロックの入力バッファが入力バッファをフラッシュします。

+0

トランスフォームブロックがブロードキャストブロックからすべてのメッセージを受信しない可能性があることに注意してください。彼らは_latest_メッセージだけを受信します。ブロードキャストブロックが、トランスフォームブロックがそれらを受信するよりも速くメッセージを提供される場合、トランスフォームブロックはメッセージを欠いてしまう。また、メッセージの順序などを保証したい場合には 'SendAsync(i)'に 'await'する必要があります。 – urbanhusky

答えて

24

問題がcasperOneが彼に言った、まさにです回答。最初の変換ブロックが完了すると、プロセッサブロックは「仕上げモード」に入ります。入力キュー内の残りのアイテムは処理されますが、新しいアイテムは受け入れられません。

しかし二つにあなたのプロセッサブロックを分割するよりも簡単修正プログラムがあります:PropagateCompletionを設定し、代わりに完全な変換ブロックの両方の際に手動でプロセッサブロックの終了を設定しない:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion) 
    .ContinueWith(_ => processorBlock.Complete()); 
+0

私が探していたもの。 Task.WhenAllが待っている仕事、私の過失を返すことを認識していませんでした。 –

+0

私は非常に同じ、おそらくは遅すぎる必要がありますが、Task.WhenAllをどこに追加する必要があるかについての更新を投稿できますか? –

+0

@AttilaHajdrikおそらくあなたのデータフロー設定コードの終わりで、あなたの 'LinkTo'の近くにあります。 – svick

22

ブロックをリンクするためにLinkTo methodに電話をかけ、変換ブロックで待ち時間が異なると、毎回PropagateCompletion propertyを設定するという問題があります。 IDataflowBlock interface(強調鉱山)のComplete methodのドキュメントから

:それはを受け入れず、それ以上のメッセージを生成したり、それ以上延期メッセージを消費してはならないことIDataflowBlockへ

シグナル。あなたがTransformBlock<TInput, TOutput>インスタンスのそれぞれであなたの待機時間をずらすので

は、transformBlock2(20ミリ秒を待っている)は、(50ミリ秒を待っている)transformBlock1前に終了します。 transformBlock2が最初に完了し、processorBlockに信号が送信され、「私は何も受け入れていません」と表示されます(まだtransformBlock1はすべてのメッセージを生成していません)。

transformBlock1より前のtransformBlock1の処理は、の絶対にではありません。スレッドプール(デフォルトのスケジューラーを使用していると仮定した場合)は、タスクを別の順序で処理します(ただし、20ミリ秒の項目が完了するとキューから作業を奪う可能性があります)。

あなたのパイプラインは次のようになります。これを回避するために、

  broadcastBlock 
     /   \ 
transformBlock1 transformBlock2 
      \   /
      processorBlock 

、あなたはこのようになりますパイプラインを持つようにしたい:

ちょうど別の2を作成することによって達成される
  broadcastBlock 
     /   \ 
transformBlock1 transformBlock2 
      |    | 
processorBlock1 processorBlock2 

ActionBlock<TInput>インスタンスのように:

// The action, can be a method, makes it easier to share. 
Action<string> a = i => Console.WriteLine(i); 

// Create the processor blocks. 
processorBlock1 = new ActionBlock<string>(a); 
processorBlock2 = new ActionBlock<string>(a); 


// Linking 
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true }); 

あなたは両方を待つ必要がありますプロセッサブロックの代わりに、ただ1:ここ

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait(); 

非常に重要な注意点。 ActionBlock<TInput>を作成する場合、デフォルトでは、それに渡されたExecutionDataflowBlockOptionsインスタンスのMaxDegreeOfParallelism propertyが1に設定されます。

これは、ActionBlock<TInput>に渡すAction<T> delegateへの呼び出しがスレッドセーフであり、一度に1つだけが実行されることを意味します。

あなたは今、同じAction<T>デリゲートを指す ActionBlock<TInput>のインスタンスを持っているので、あなたがスレッドの安全性を保証するものではありません。

メソッドがスレッドセーフな場合は、何もする必要はありません(ブロックする理由がないため、MaxDegreeOfParallelismプロパティをDataflowBlockOptions.Unboundedに設定できます)。

ではなくスレッドセーフで、保証する必要がある場合は、lock statementのような従来の同期プリミティブを使用する必要があります。この場合

、あなたは(Console classWriteLine methodは、スレッドセーフであるとして、それを明確に、必要ではないですが)のようなので、それを行うだろう:

// The lock. 
var l = new object(); 

// The action, can be a method, makes it easier to share. 
Action<string> a = i => { 
    // Ensure one call at a time. 
    lock (l) Console.WriteLine(i); 
}; 

// And so on... 
+0

長い回答ありがとうございますが、TPLデータフローに直接適用され、とても簡潔で簡単な解決策を提供するのでsvickの答えを選択しました。 –

+2

両方のアクションブロックに同じ['ExclusiveScheduler'](http://msdn.microsoft.com/en-us/library/system.threading.tasks.concurrentexclusiveschedulerpair.exclusivescheduler)を使用すると、簡単にロックを回避できます。 – svick

7

svickのに加え答え:PropagateCompletionオプションを使用した場合の動作と一致するためには、先行ブロックにフォルトが発生した場合に備えて例外を転送する必要もあります。次のような拡張メソッドは、同様にそのの世話をする:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) { 
    if (target == null) return; 
    if (sources.Length == 0) { target.Complete(); return; } 
    Task.Factory.ContinueWhenAll(
     sources.Select(b => b.Completion).ToArray(), 
     tasks => { 
      var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList(); 
      if (exceptions.Count != 0) { 
       target.Fault(new AggregateException(exceptions)); 
      } else { 
       target.Complete(); 
      } 
     } 
    ); 
} 
0

他の回答には、ブロックが二つ以上のソースを持っていたときに、なぜPropagateCompletion = trueを台無しについて非常に明確です。

問題を簡単に解決するには、オープンソースライブラリDataflowExを見て、よりスマートな完了規則を組み込んだこの種の問題を解決したい場合があります。 (内部でリンクするTPL Dataflowを使用しますが、複雑な補完伝播をサポートします)実装はWhenAllに似ていますが、動的リンクの追加も処理します。

コードを少し変更してすべて動作させるようにしました。 DataflowExを使用して:

public CompletionDemo1() 
{ 
    broadCaster = new BroadcastBlock<int>(
     i => 
      { 
       return i; 
      }).ToDataflow(); 

    transformBlock1 = new TransformBlock<int, string>(
     i => 
      { 
       Console.WriteLine("1 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(50); 
       return ("1_" + i); 
      }); 

    transformBlock2 = new TransformBlock<int, string>(
     i => 
      { 
       Console.WriteLine("2 input count: " + transformBlock2.InputCount); 
       Thread.Sleep(20); 
       return ("2_" + i); 
      }); 

    processor = new ActionBlock<string>(
     i => 
      { 
       Console.WriteLine(i); 
      }).ToDataflow(); 

    /** rather than TPL linking 
     broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); 
     broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    **/ 

    //Use DataflowEx linking 
    var transform1 = transformBlock1.ToDataflow(); 
    var transform2 = transformBlock2.ToDataflow(); 

    broadCaster.LinkTo(transform1); 
    broadCaster.LinkTo(transform2); 
    transform1.LinkTo(processor); 
    transform2.LinkTo(processor); 
} 

完全なコードはhereです。

免責事項:私はMITライセンスで公開されているDataflowExの著者です。

+0

Gridsumで働く場合に開示していただけますか?私の質問では、TPL Dataflowの回答が必要であることが明示されていました。私はこの問題に対してサードパーティのソリューションを使用したくありませんでした。ありがとう。 –

+1

はい、私はGridsumのために働いています。しかし、図書館は完全に無料でオープンソースなので、私はそれがあなたを助けるかもしれないと思った。商業的な考え方は全くありません。必要なものがTPLデータフローの内部メカニズムに関するものであれば、私の答えは無視してください。しかし、誰かが*解決策を必要とするならば、その答えはその価値があります。ありがとう:) – Dodd

+0

答えをもう少し詳しく更新しました。免責事項も追加されました。 – Dodd

関連する問題