両方の変換ブロックが完了したときにコードが完了するコードを再書き込みするにはどうすればよいですか?完了とは、完了とマークされ、「アウトキュー」が空であることを意味します。TPLデータフロー、すべてのソースデータブロックが完了したときのみ完了を保証
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
Iは、各ブロックを変換するための入力バッファカウントを追加して、コードを編集しました。明らかに、100のすべての項目が各変換ブロックにストリーミングされます。しかし、変換ブロックの1つが終了すると、プロセッサブロックはこれ以上アイテムを受け入れず、代わりに不完全な変換ブロックの入力バッファが入力バッファをフラッシュします。
トランスフォームブロックがブロードキャストブロックからすべてのメッセージを受信しない可能性があることに注意してください。彼らは_latest_メッセージだけを受信します。ブロードキャストブロックが、トランスフォームブロックがそれらを受信するよりも速くメッセージを提供される場合、トランスフォームブロックはメッセージを欠いてしまう。また、メッセージの順序などを保証したい場合には 'SendAsync(i)'に 'await'する必要があります。 – urbanhusky