私はTPL DataFlowを使用するTaskプロセッサを作成しています。私はプロデューサーの消費者モデルに従います。プロデューサーではしばらくの間処理されるべきアイテムがいくつか生産され、消費者は新しいアイテムが到着するのを待っています。ここに私のコードはあります:例外処理を使用してDataFlow Meshを終了しない方法を作成するには?
async Task Main()
{
var runner = new Runner();
CancellationTokenSource cts = new CancellationTokenSource();
Task runnerTask = runner.ExecuteAsync(cts.Token);
await Task.WhenAll(runnerTask);
}
public class Runner
{
public async Task ExecuteAsync(CancellationToken cancellationToken) {
var random = new Random();
ActionMeshProcessor processor = new ActionMeshProcessor();
await processor.Init(cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more
int[] items = GetItems(random.Next(3, 7));
await processor.ProcessBlockAsync(items);
}
}
private int[] GetItems(int count)
{
Random randNum = new Random();
int[] arr = new int[count];
for (int i = 0; i < count; i++)
{
arr[i] = randNum.Next(10, 20);
}
return arr;
}
}
public class ActionMeshProcessor
{
private TransformBlock<int, int> Transformer { get; set; }
private ActionBlock<int> CompletionAnnouncer { get; set; }
public async Task Init(CancellationToken cancellationToken)
{
var options = new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = 5,
BoundedCapacity = 5
};
this.Transformer = new TransformBlock<int, int>(async input => {
await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!
if (input > 15)
{
throw new Exception($"I can't handle this number: {input}");
}
return input + 1;
}, options);
this.CompletionAnnouncer = new ActionBlock<int>(async input =>
{
Console.WriteLine($"Completed: {input}");
await Task.FromResult(0);
}, options);
this.Transformer.LinkTo(this.CompletionAnnouncer);
await Task.FromResult(0); // what do I await here?
}
public async Task ProcessBlockAsync(int[] arr)
{
foreach (var item in arr)
{
await this.Transformer.SendAsync(item); // await if there are no free slots
}
}
}
上記の条件チェックを追加して、例外をスローして例外的なケースを模倣しました。
ここに私の質問は以下のとおりです。
私がダウンしてメッシュ全体をもたらすことなく、上記のメッシュで例外を処理することができる最善の方法は何ですか?
終わりのないDataFlowメッシュを初期化/開始/継続する良い方法はありますか?
完了はどこですか?
私はそれが標準の同期のコンストラクタかもしれないあなたのinit
での非同期何もないthis similar question
、おかげ@JStewardが、私は介してデバッグしますと、私はどうかを確認:
はその後、あなたのサンプルに基づいて完成のための最も可能性の高い場所には、ループの外で運転あなたの処理であり、他のエッジケースにヒットして更新! – Amit