3

の終わりに向かってそれぞれのために定期的なように振る舞う:500+レコードがあると仮定するとParallel.ForEachが、私はこのような何かを実行したときに、私はこの問題を抱えてい反復

Parallel.ForEach(dataTable.AsEnumerable(), row => 
{ 
    //do processing 
} 

はパラレルたら870を言います.ForEachは850を完了すると、一度に1つの操作しか実行されていないようです。それは850の操作を非常に速く完了しましたが、それが反復の終わりに近づくと非常に遅くなり、それぞれのために通常のように機能しているようです。私は2000レコードを試してみました。

私のコードで何か問題がありますか?ご提案ください。以下は

私は間違った例を掲載、私は

申し訳ありませんを使用していたコードです。これは正しいコードです:

Task newTask = Task.Factory.StartNew(() => 
{ 
    Parallel.ForEach(dtResult.AsEnumerable(), dr => 
    { 
     string extractQuery = ""; 
     string downLoadFileFullName = ""; 
     lock (foreachObject) 
     { 

      string fileName = extractorConfig.EncodeFileName(dr); 
      extractQuery = extractorConfig.GetExtractQuery(dr); 
      if (string.IsNullOrEmpty(extractQuery)) throw new Exception("Extract Query not found. Please check the configuration"); 

      string newDownLoadPath = CommonUtil.GetFormalizedDataPath(sDownLoadPath, uKey.CobDate); 
      //create folder if it doesn't exist 
      if (!Directory.Exists(newDownLoadPath)) Directory.CreateDirectory(newDownLoadPath); 
      downLoadFileFullName = Path.Combine(newDownLoadPath, fileName); 
     } 
     Interlocked.Increment(ref index); 

     ExtractorClass util = new ExtractorClass(SourceDbConnStr); 
     util.LoadToFile(extractQuery, downLoadFileFullName); 
     Interlocked.Increment(ref uiTimerIndex); 
    }); 
}); 
+3

私は、これはコメントまたは回答する必要がありますが、私はそれを指摘する必要があると感じた場合はわからない、完全なコードブロック –

+2

を提供してください: 'DataTable'はスレッドセーフタイプではありません。だから、もしあなたの '//処理している'コードが(まるで個々の行内のセルにさえ)何らかの修正を含んでいれば、あなたは痛みの世界を求めている、と私は恐れている。 –

+0

DataTableのすべての行について、データベースへの呼び出しを行い、データをフェッチしてファイルにロードします。その抽出プロセスのような。データベースからデータを取得し、ファイルに抽出します。 –

答えて

3

私の推測:

これはからの潜在的なIOの高度持つようになります:ディスク にDBへ

  • データベース+ディスク
  • ネットワーク通信とバック
  • ライティング結果を

したがって、IOを待つのに多くの時間が費やされます。私の推測では、より多くのスレッドがミックスに追加され、IOがさらに強調されているため、待ち時間が悪化するだけです。たとえば、ディスクには1組のヘッドしかないため、同時に書き込むことはできません。並行して書き込もうとするスレッドが多数ある場合は、パフォーマンスが低下します。

使用しているスレッドの最大数を制限してみてください。

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 }; 

Parallel.ForEach(dtResult.AsEnumerable(), options, dr => 
{ 
    //Do stuff 
}); 

更新

あなたのコード編集した後、私は変更のカップルを持って、次のことをお勧め:

  • スレッドの最大数を減らしてください - これは実験的に可能です。
  • ディレクトリのチェックと作成は一度だけ実行してください。

コード:

private static bool isDirectoryCreated; 

//... 

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 }; 

Parallel.ForEach(dtResult.AsEnumerable(), options, dr => 
{ 
    string fileName, extractQuery, newDownLoadPath; 

    lock (foreachObject) 
    { 
     fileName = extractorConfig.EncodeFileName(dr); 

     extractQuery = extractorConfig.GetExtractQuery(dr); 

     if (string.IsNullOrEmpty(extractQuery)) 
      throw new Exception("Extract Query not found. Please check the configuration"); 

     newDownLoadPath = CommonUtil.GetFormalizedDataPath(sDownLoadPath, uKey.CobDate); 

     if (!isDirectoryCreated) 
     { 
      if (!Directory.Exists(newDownLoadPath)) 
       Directory.CreateDirectory(newDownLoadPath); 

      isDirectoryCreated = true; 
     } 
    } 

    string downLoadFileFullName = Path.Combine(newDownLoadPath, fileName); 

    Interlocked.Increment(ref index); 

    ExtractorClass util = new ExtractorClass(SourceDbConnStr); 
    util.LoadToFile(extractQuery, downLoadFileFullName); 

    Interlocked.Increment(ref uiTimerIndex); 
}); 
+0

@chibacity私は提案を試みた。私は同じことを実行すると思いますが、コードの修正を指摘していただきありがとうございます。 –

+0

@bunnyスレッドの数を制限しても改善されませんでしたか? –

+0

@chibacity実際には少し減速しました。 –

2

関連するコードなしで詳細を伝えるのは難しいですが、これは一般的に予想される動作です。 .NETはすべてのプロセッサが均等にビジー状態になるようにタスクをスケジュールしようとします。

これは、すべてのタスクに同じ時間がかかるわけではありません。結局、一部のプロセッサは動作し、一部は動作しませんし、作業を再配布するのはコストがかかり、必ずしも有益ではありません。

PLinqで使用されているロードバランシングの詳細はわかりませんが、最終行はこの動作を完全に防ぐことができないということです。

1

次の2つのスレッドを並列処理を制限することを想定します。 Parallel.ForEachには、少なくとも2つの可能な方法があります。 1つの方法は、2つのスレッドが開始され、それぞれが完了するために項目の半分が与えられることです。したがって、850のアイテムがある場合、実際にはスレッド1に最初の425アイテムが与えられ、スレッド2には425アイテムの2番目のブロックが与えられます。両方のスレッドが動作するようになりました。処理される項目の順序は、[0、425、426、1、2、427、3、428、429、4、...]のようになります。

それは、スレッドの一方が他方にはよりもはるかに高速の項目のそのグループを完了します(おそらく、実際には)ということは非常に可能です。それは仕事ができる

もう一つの方法は、何のアイテムが処理する残っていないがなくなるまで繰り返し、2つのスレッドを開始し、それぞれのグラブにリストから項目、プロセス、それを持っているし、次の項目を取得することです。この場合、処理される項目の順序は、[0,1,2,4,3,6,5、...]のようになります。

最初の例では、各スレッドは、プロセスにアイテムのブロックが与えられます。 2番目のケースでは、各スレッドは、アイテムが残っていない限り、共通ブロックのアイテムを処理します。

はバリエーションがありますが、これらは複数のスレッド間で作業を分割するには、2つの主要な方法です。各自にアイテムのグループを与えたり、各スレッドが処理を終えたら、次のアイテムを求めるようにしてください。

Parallel.ForEachは、最初の方法で実装されている:各スレッドが処理するアイテムの独自のグループが与えられます。これとは逆の方法では、アイテムのリストを共有キューのように扱わなければならず、その結果、同期オーバーヘッドが発生するため、オーバーヘッドが増えます。

+1

'Parallel.ForEach'はあなたの提案する方法では動作しません。すなわち入力リストは分割され、最初は専用のスレッドに分割されます。実際には2番目の方法で動作します。作業待ち行列があり、スレッドはそれらを通って行きます。 'MaxDegreeOfParallelism = 2'を指定すると、処理の全期間を通して3つ以上の異なるスレッドが得られる可能性は非常に高いですが、いつでも2つしか実行されません。実際の順序は、第2の例、すなわち[0,1,2,4,3,6,5、...]である。きめ細かくインターリーブされています。 –

+1

@chibacity:情報ありがとうございます。面白い。それは私が実験から得た結果と矛盾します。私はどこに間違って行ったか見に行く必要があります。 –

+1

例:public void test() { var opts = new ParallelOptions {MaxDegreeOfParallelism = 2}; var r =新しいリスト(); INT T = Thread.CurrentThread.ManagedThreadId; ロック(R)r.Add(新しい結果 Parallel.ForEach(Enumerable.Range(0、1000)、I => { のThread.sleep(1)、オプト{Th = t、I = i}); }); (1)。、x.Th、x.I));};}}};}}}};}}}} } public class Result { public int I、Th; } –

関連する問題