2016-02-25 3 views
9

バインドされた容量のバッチブロックを作成し、triggerBatchを呼び出して新しいアイテムをポストすると、トリガーのバッチ実行中に新しいアイテムが失敗します。予期しない動作 - TPL DataFlow BatchBlockは、TriggerBatchの実行中にアイテムを拒否します。

呼び出しトリガーバッチ(X回ごと)は、受信データストリームが一時停止または遅くなった場合に、データがブロック内で長すぎないようにするために行われます。

次のコードは、「失敗後」イベントを出力します。たとえば :

public static void Main(string[] args) 
    { 
     var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 }); 
     var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); 
     batchBlock.LinkTo(actionBlock); 

     var producerTask = Task.Factory.StartNew(() => 
     { 
      //Post 10K Items 
      for (int i = 0; i < 10000; i++) 
      { 
       var postResult = batchBlock.Post(i); 
       if (!postResult) 
        Console.WriteLine("Failed to Post"); 
      } 
     }); 

     var triggerBatchTask = Task.Factory.StartNew(() => 
      {      
       //Trigger Batch.. 
       for (int i = 0; i < 1000000; i++) 
        batchBlock.TriggerBatch(); 
      }); 

     producerTask.Wait(); 
     triggerBatchTask.Wait(); 
    } 

    public static void ProcessBatch(int[] batch) 
    { 
     Console.WriteLine("{0} - {1}", batch.First(), batch.Last()); 
    } 

*このシナリオはbatchBlockが制限された場合にのみ再現可能であることに注意してください。

は、私が何かをしないのですか、それはbatchBlockの問題ですか? BatchBlockが実際にアイテムを拒否していない

答えて

3

、それを延期しようとします。 Post()の場合を除いて、延期はオプションではありません。これを修正する簡単な方法は、batchBlock.Post(i)の代わりにawait batchBlock.SendAsync(i)を使用することです(これはTask.Factory.StartNew(() =>Task.Run(async() =>に変更する必要があることを意味します)。

どうしてですか? the source codeによれば、BatchBlockがバインドされている場合、TriggerBatch()は非同期に処理され、処理されている間は新しい項目は受け入れられません。いずれの場合においても

、あなたはPost()は常にブロックがいっぱいの場合、Post()falseを返します、有界ブロックにtrueを返すことを期待すべきではありません。

+0

これは – i3arnon

+0

。一方、私は失敗を受け入れる別のブロックを導入することで、別のソリューションを使用していて、最終的に私は両方のブロックにシリアルにtriggerbatch呼んでいる...私にとって非常に驚くべきことでした。あなたへ は解決策を提案する - 待つと非同期では、多くのタスクを無制限に作成されるイベントの巨大なバーストを持っている場合、これはメモリの問題のうち、発生する可能性があります、各入ってくるアイテムを処理するタスクを作成します。 –

+0

@AlYarosいいえ、そうではありません。アイテムが受け入れられれば、キャッシュされた 'Task'が得られるので、そこには割り当てはありません。アイテムが延期されている場合、表示されたコードは承認されるまで新しいアイテムを追加しません。あなたの実際のコードで 'await'が問題を引き起こすならば、IMOはそれらを修正することができなければなりません。そうしなくても問題が発生するでしょう。 – svick

関連する問題