2017-04-24 8 views
3

私はTPL DataFlowを使用するTaskプロセッサを作成しています。私はプロデューサーの消費者モデルに従います。プロデューサーではしばらくの間処理されるべきアイテムがいくつか生産され、消費者は新しいアイテムが到着するのを待っています。ここに私のコードはあります:例外処理を使用してDataFlow Meshを終了しない方法を作成するには?

async Task Main() 
{ 
    var runner = new Runner(); 
    CancellationTokenSource cts = new CancellationTokenSource(); 
    Task runnerTask = runner.ExecuteAsync(cts.Token); 

    await Task.WhenAll(runnerTask); 
} 

public class Runner 
{ 
    public async Task ExecuteAsync(CancellationToken cancellationToken) { 
     var random = new Random(); 

     ActionMeshProcessor processor = new ActionMeshProcessor(); 
     await processor.Init(cancellationToken); 

     while (!cancellationToken.IsCancellationRequested) 
     { 
      await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more 

      int[] items = GetItems(random.Next(3, 7)); 

      await processor.ProcessBlockAsync(items); 
     } 
    } 

    private int[] GetItems(int count) 
    { 
     Random randNum = new Random(); 

     int[] arr = new int[count]; 
     for (int i = 0; i < count; i++) 
     { 
      arr[i] = randNum.Next(10, 20); 
     } 

     return arr; 
    } 
} 

public class ActionMeshProcessor 
{ 
    private TransformBlock<int, int> Transformer { get; set; } 
    private ActionBlock<int> CompletionAnnouncer { get; set; } 

    public async Task Init(CancellationToken cancellationToken) 
    { 
     var options = new ExecutionDataflowBlockOptions 
     { 
      CancellationToken = cancellationToken, 
      MaxDegreeOfParallelism = 5, 
      BoundedCapacity = 5 
     }; 


     this.Transformer = new TransformBlock<int, int>(async input => { 

      await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here! 

      if (input > 15) 
      { 
       throw new Exception($"I can't handle this number: {input}"); 
      } 

      return input + 1; 
     }, options); 

     this.CompletionAnnouncer = new ActionBlock<int>(async input => 
     { 
      Console.WriteLine($"Completed: {input}"); 

      await Task.FromResult(0); 
     }, options); 

     this.Transformer.LinkTo(this.CompletionAnnouncer); 

     await Task.FromResult(0); // what do I await here? 
    } 

    public async Task ProcessBlockAsync(int[] arr) 
    { 
     foreach (var item in arr) 
     { 
      await this.Transformer.SendAsync(item); // await if there are no free slots 
     }  
    } 
} 

上記の条件チェックを追加して、例外をスローして例外的なケースを模倣しました。

ここに私の質問は以下のとおりです。

  • 私がダウンしてメッシュ全体をもたらすことなく、上記のメッシュで例外を処理することができる最善の方法は何ですか?

  • 終わりのないDataFlowメッシュを初期化/開始/継続する良い方法はありますか?

  • 完了はどこですか?

私はそれが標準の同期のコンストラクタかもしれないあなたのinitでの非同期何もないthis similar question

答えて

3

例外

にして見てきました。メッシュ内の例外を処理するには、ブロックに与えたラムダを単純なtry catchでメッシュを落とさないでください。メッシュの結果をフィルタリングするか、次のブロックで結果を無視することで、そのケースを処理できます。以下はフィルタリングの例です。 intの単純なケースでは、int?を使用して、nullの値を除外するか、必要に応じて任意の種類の魔法の指標値を設定できます。実際に参照型を渡す場合は、nullをプッシュするか、リンク上の述語で調べられるようにデータ項目をダーティにマークすることができます。

public class ActionMeshProcessor { 
    private TransformBlock<int, int?> Transformer { get; set; } 
    private ActionBlock<int?> CompletionAnnouncer { get; set; } 

    public ActionMeshProcessor(CancellationToken cancellationToken) { 
     var options = new ExecutionDataflowBlockOptions { 
      CancellationToken = cancellationToken, 
      MaxDegreeOfParallelism = 5, 
      BoundedCapacity = 5 
     }; 


     this.Transformer = new TransformBlock<int, int?>(async input => { 
      try { 
       await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here! 

       if (input > 15) { 
        throw new Exception($"I can't handle this number: {input}"); 
       } 

       return input + 1; 
      } catch (Exception ex) { 
       return null; 
      } 

     }, options); 

     this.CompletionAnnouncer = new ActionBlock<int?>(async input => 
     { 
      if (input == null) throw new ArgumentNullException("input"); 

      Console.WriteLine($"Completed: {input}"); 

      await Task.FromResult(0); 
     }, options); 

     //Filtering 
     this.Transformer.LinkTo(this.CompletionAnnouncer, x => x != null); 
     this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>()); 
    } 

    public async Task ProcessBlockAsync(int[] arr) { 
     foreach (var item in arr) { 
      await this.Transformer.SendAsync(item); // await if there are no free slots 
     } 
    } 
} 

あなたがシャットダウンしたいだけの時間あなたメッシュ厥と仮定すると、あなたのプロセッサからComplete()Completionを公開し、awaitに完了アプリのshutsdownをそれらを使用することができます

完成。また、あなたのリンクを通って補完を正しく伝えてください。これは良い探している

public async Task ExecuteAsync(CancellationToken cancellationToken) { 
    var random = new Random(); 

    ActionMeshProcessor processor = new ActionMeshProcessor(); 
    await processor.Init(cancellationToken); 

    while (!cancellationToken.IsCancellationRequested) { 
     await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more 

     int[] items = GetItems(random.Next(3, 7)); 

     await processor.ProcessBlockAsync(items); 
    } 
    //asuming you don't intend to throw from cancellation 
    processor.Complete(); 
    await processor.Completion(); 

} 
+0

、おかげ@JStewardが、私は介してデバッグしますと、私はどうかを確認:

//Filtering this.Transformer.LinkTo(this.CompletionAnnouncer, new DataflowLinkOptions() { PropagateCompletion = true }, x => x != null); this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>()); } public void Complete() { Transformer.Complete(); } public Task Completion { get { return CompletionAnnouncer.Completion; } } 

はその後、あなたのサンプルに基づいて完成のための最も可能性の高い場所には、ループの外で運転あなたの処理であり、他のエッジケースにヒットして更新! – Amit

関連する問題