2017-08-15 5 views
-3

DataTableに1k行があり、Parallel.ForEachを使用して行を反復処理しています 次のメソッドは行ごとに反復処理され、すべての行は後でSMTPキューがEMLファイルを選択し、それらにDataTable内の行に複数回アクセスするParallel.ForEach

public static bool GenerateValidEmlFiles(DataTable valids) 
    { 
     wroteToDb = false; 
     // init. cmp id from the table 
     CmpId = int.Parse(valids.Rows[0][CampaignId].ToString()); 

     Parallel.ForEach(valids.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 1 }, (currentRow) => 
     {     
      CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); 
      CancellationToken token = tokenSource.Token; 

      // create new task "thread" for every row in the DataTable 
      Task task = Task.Factory.StartNew(() => 
      { 
       try 
       { 


        if (token.IsCancellationRequested) 
         token.ThrowIfCancellationRequested(); 
        string id = currentRow[CampaignRecipientId].ToString(); 
        writeEvent(int.Parse(id), "Recipient row is being processed"); 

        // MailBee.Mime MailMessage 
        using (MailMessage msg = new MailMessage()) 
        { 
         // init. parameters 
         msg.From.AsString = currentRow[EmailFrom].ToString(); 
         msg.To.AsString = currentRow[EmailTo].ToString(); 
         msg.ReplyTo.AsString = currentRow[EmailReplyTo].ToString(); 
         msg.Subject = "=?UTF-8?B?" + Convert.ToBase64String(Encoding.UTF8.GetBytes(currentRow[EmailSubject].ToString())) + "?="; 
         msg.BodyHtmlText = HTMLTags.Replace("ReplaceBody", currentRow[EmailBody].ToString()); 
         //assing X-TWC id number into the header. 
         msg.Headers.Add(ConfigurationManager.AppSettings["TWCHeader"].ToString(), id, false); 
         //if there is an attachment add it to the Message 
         if ((currentRow[EmailAttachment] as object != null) && !string.IsNullOrEmpty(currentRow[EmailAttachmentName].ToString())) 
         { 
          byte[] filedata = (byte[])currentRow[EmailAttachment]; 
          msg.Attachments.Add(filedata, currentRow[EmailAttachmentName].ToString(), "", "", null, NewAttachmentOptions.None, MailTransferEncoding.None); 
         } 

         msg.EncodeAllHeaders(Encoding.UTF8, MailBee.Mime.HeaderEncodingOptions.Base64); 
         //save message as *.Eml to be sent by the SMTP Queue       
         msg.SaveMessage(@"E:\WEBS\Ready\" + id + "_" + msg.To.AsString + ".eml"); 
         writeEvent(int.Parse(id), "EML file written to Disk"); 

         //adding ID to a list to write the whole list back to the DB in a single DB call. 
         if (!IDs.Contains(id)) 
          IDs.Add(id); 

        }       

       } 
       catch (Exception ex) 
       { 
        using (StreamWriter sw = new StreamWriter(AppDomain.CurrentDomain.BaseDirectory + "\\INNER.txt", true)) 
        { 
         sw.WriteLine(DateTime.Now.ToString() + ": " + ex.Source.ToString().Trim() + ", " + ex.Message.ToString().Trim() + ex.StackTrace); 
         sw.Flush(); 
         sw.Close(); 
        } 
       } 

      }, token); 
      task.Wait(); 
     }); 

     if (!wroteToDb) 
     { 
      WriteEmlEvents(); 
      //set lock flag to true 
      wroteToDb = true; 
      return true; 
     } 
     return false; 
    } 

をお送りします問題は最初の行がされていることではMailMessage、それははMailMessageのパラメータを初期化し、それが* .emlファイル としてディスクに保存します2回処理されるか、FroEachによって2回アクセスされます。 DataTableの重複をチェックしようとしましたが、重複する行が全く見つかりませんでした。 DataTableの代わりにDataReaderを使用することをお勧めしますか?

私が提案答えを試みたが、私はこの例外が発生しました: FYI ========私はMaxDegreeOfParallelismを削除するときにのみ発生し、私はこれを1に設定すると、それは細かい

8/15/2017 3:37:05 PM: MailBee.NET.45, IOException occurred. InnerException message follows: The process cannot access the file 'E:\WEBS\TerranovaQueue Files\Ready\[email protected]' because it is being used by another process. at a.n.b(String A_0, Byte[] A_1, Int32 A_2, Int32 A_3, Byte[] A_4) 
    at MailBee.Mime.MailMessage.SaveMessage(String filename) 

System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: Collection was modified; enumeration operation might not execute. 
    at System.Data.RBTree`1.RBTreeEnumerator.MoveNext() 
    at System.Linq.Enumerable.<CastIterator>d__94`1.MoveNext() 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk_Buffered(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerator.GrabNextChunk(Int32 requestedChunkSize) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionEnumerator_Abstract`2.MoveNext() 
    at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1() 
    at System.Threading.Tasks.Task.InnerInvoke() 
    at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) 
    at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object) 
    --- End of inner exception stack trace --- 
    at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions) 
    at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken) 
    at System.Threading.Tasks.Task.Wait() 
    at System.Threading.Tasks.Parallel.PartitionerForEachWorker[TSource,TLocal](Partitioner`1 source, ParallelOptions parallelOptions, Action`1 simpleBody, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally) 
    at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally) 
    at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable`1 source, Action`1 body) 
    at EmailValidatorLibrary.EmailGenerator.GenerateValidEmlFiles(DataTable valids) in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\EmailValidatorLibrary\EmailGenerator.cs:line 50 
    at TerranovaService.TerranovaService.StartProcess() in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\TerranovaService\TerranovaService.cs:line 133 
---> (Inner Exception #0) System.InvalidOperationException: Collection was modified; enumeration operation might not execute. 
    at System.Data.RBTree`1.RBTreeEnumerator.MoveNext() 
    at System.Linq.Enumerable.<CastIterator>d__94`1.MoveNext() 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk_Buffered(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerator.GrabNextChunk(Int32 requestedChunkSize) 
    at System.Collections.Concurrent.Partitioner.DynamicPartitionEnumerator_Abstract`2.MoveNext() 
    at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1() 
    at System.Threading.Tasks.Task.InnerInvoke() 
    at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) 
    at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object)<--- 

---> (Inner Exception #1) System.IO.IOException: The process cannot access the file 'E:\WEBS\TWC Mail Services\INNER.txt' because it is being used by another process. 
    at System.IO.__Error.WinIOError(Int32 errorCode, String maybeFullPath) 
    at System.IO.FileStream.Init(String path, FileMode mode, FileAccess access, Int32 rights, Boolean useRights, FileShare share, Int32 bufferSize, FileOptions options, SECURITY_ATTRIBUTES secAttrs, String msgPath, Boolean bFromProxy, Boolean useLongPath, Boolean checkHost) 
    at System.IO.FileStream..ctor(String path, FileMode mode, FileAccess access, FileShare share, Int32 bufferSize, FileOptions options, String msgPath, Boolean bFromProxy, Boolean useLongPath, Boolean checkHost) 
    at System.IO.StreamWriter.CreateFile(String path, Boolean append, Boolean checkHost) 
    at System.IO.StreamWriter..ctor(String path, Boolean append, Encoding encoding, Int32 bufferSize, Boolean checkHost) 
    at System.IO.StreamWriter..ctor(String path, Boolean append) 
    at EmailValidatorLibrary.EmailGenerator.<>c.<GenerateValidEmlFiles>b__19_0(DataRow currentRow) in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\EmailValidatorLibrary\EmailGenerator.cs:line 107 
    at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1() 
    at System.Threading.Tasks.Task.InnerInvoke() 
    at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) 
    at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object)<--- 
mscorlib 

の作品========================= UPDATE foreachはまだ同じ行を複数回! 私は実際に log

+0

間違っていることを示すものは何もありません。 「作業中です」というコードは何ですか、詳細な情報が必要です。 –

+1

あなたは重要な部分を省いた。どのコードが実際にDataTableにアクセスして動作しますか?また、 'Parallel.ForEach'を使っているのであれば、おそらく' Task'ですべてのことを気にする必要はありません。 –

+0

@JakubDąbek質問内のコードを更新しました –

答えて

0

あなたが経験していることclosureと古典的な問題であり、何が起こっているかを確認するためにtimestmapとロガーを追加しました。変数currentRowはクロージャーに参加します。これは、とりわけ、Taskが実行される前にその値が変更される可能性があることを意味します。

この問題を解決するには、Taskを削除します。 Parallel.ForEachに渡すラムダ式でコードを直接実行して(最大並列度を上げてください)これはすでに別のスレッド(Parallel.ForEachのため)で実行されているので、Taskにラップすることは絶対にありません。

public static bool GenerateValidEmlFiles(DataTable valids) 
{ 
    wroteToDb = false; 
    // init. cmp id from the table 
    CmpId = int.Parse(valids.Rows[0][CampaignId].ToString()); 

    Parallel.ForEach(valids.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 10 }, (currentRow) => 
    {     
     string id = currentRow[CampaignRecipientId].ToString(); 
     writeEvent(int.Parse(id), "Recipient row is being processed"); 

     // MailBee.Mime MailMessage 
     using (MailMessage msg = new MailMessage()) 
     { 
      //etc. etc. 

P.S. IDsが同時性をサポートするタイプ(例えば、ConcurrentBag<int>)であることを確認し、writeEventがスレッドセーフであることを確認してください。

+0

' currentRow'はラムダのパラメータです。クロージャにはありません。 –

+0

両方です。これは、 'ForEach'に対してラムダのパラメータです。 'Task'に関しては閉鎖されています。だから私は 'タスク'を取り除くと言う。 –

+1

問題はOPのコードが実質的に同期していることです。とにかく 'Task'のための' Wait'です。 –

関連する問題