2012-10-10 36 views
19

私はIDのリストを持っており、各IDにいくつかのストアドプロシージャを実行する必要があります。Entity Frameworkとの並列処理はありません

私は標準的なforeachループを使用していますが、正常に動作しますが、私は多くのレコードを持っているとかなり遅いです。

私はEFで動作するようにコードを変換したかったのですが、例外が発生しました。「基礎となるプロバイダがOpenで失敗しました」。

私はParallel.ForEachの内側に、このコードを使用しています:

using (XmlEntities osContext = new XmlEntities()) 
{ 
    //The code 
} 

をしかし、それはまだ例外がスローされます。

パラレルでEFを使用するにはどうすればよいですか?実行しているすべてのプロシージャに対して新しいコンテキストを作成する必要がありますか?私は約10の手続きを持っているので、それぞれ10個のコンテキストを作成することは非常に悪いと思います。

+1

私はマルチスレッドの専門家ではありませんが、トランザクションを使用している場合や読み書きがロックされている場合は、直列的に行うよりもパフォーマンスが向上しない場合があります。 – Matthew

+0

私は、より多くのスレッドで強調されたSQL Serverを打つことはパフォーマンスに役立つことはないとは思わない... – rene

+2

SQLは全くストレスがかからず、なぜ並列を使用したいのか?それは確かに速く実行されます。 –

答えて

31

The underlying database connections that the Entity Framework are using are not thread-safeは、実行する別のスレッドで操作ごとに新しいコンテキストを作成する必要があります。

操作を並列化する方法についてのあなたの懸念は有効です。多くの文脈が開いたり閉じたりするのに高価になるだろう。

代わりに、コードを並列化する方法を逆にしたい場合があります。それはあなたがいくつかの項目をループしていると、各項目のためにシリアルでストアドプロシージャを呼び出しているようです。その後、

(あなたが結果を必要としない場合、またはTask)あなたは、各手順のための新しいTask<TResult>を作成できるかどうかと、そのTask<TResult>に、単一のコンテキストを開いて、すべてのアイテムをループし、ストアドプロシージャを実行します。このようにして、実行中のストアドプロシージャの数と同じ数のコンテキストのみが並行して実行されます。

は、あなたがクラスのインスタンスを取り、どちらも2羽のストアドプロシージャ、DoSomething1DoSomething2、とMyDbContextMyItemを持っていると仮定しましょう。あなたはが並行してストアドプロシージャを(それぞれが特定の順序で実行されているに依存します)を実行できない場合

// You'd probably want to materialize this into an IList<T> to avoid 
// warnings about multiple iterations of an IEnumerable<T>. 
// You definitely *don't* want this to be an IQueryable<T> 
// returned from a context. 
IEnumerable<MyItem> items = ...; 

// The first stored procedure is called here. 
Task t1 = Task.Run(() => { 
    // Create the context. 
    using (var ctx = new MyDbContext()) 
    // Cycle through each item. 
    foreach (MyItem item in items) 
    { 
     // Call the first stored procedure. 
     // You'd of course, have to do something with item here. 
     ctx.DoSomething1(item); 
    } 
}); 

// The second stored procedure is called here. 
Task t2 = Task.Run(() => { 
    // Create the context. 
    using (var ctx = new MyDbContext()) 
    // Cycle through each item. 
    foreach (MyItem item in items) 
    { 
     // Call the first stored procedure. 
     // You'd of course, have to do something with item here. 
     ctx.DoSomething2(item); 
    } 
}); 

// Do something when both of the tasks are done. 

、その後、あなたはまだすることができ:

のような上記になります何かを実装しますあなたの操作を並列化してください。ちょっと複雑です。

Partitioner classの静的Create methodを使用すると、アイテム全体でcreating custom partitionsが表示されます。これにより、IEnumerator<T>実装を取得する手段が提供されます(これはではありません。IEnumerable<T>なのでforeachにはできません)。

あなたが戻って取得各IEnumerator<T>たとえば、あなたが作成したい新しいTask<TResult>(あなたは結果が必要な場合)、およびTask<TResult>ボディに、あなたが呼び出し、IEnumerator<T>によって返された項目によるコンテキストと、その後のサイクルを作成しますストアドプロシージャを順番に実行します。

// Get the partitioner. 
OrdinalPartitioner<MyItem> partitioner = Partitioner.Create(items); 

// Get the partitions. 
// You'll have to set the parameter for the number of partitions here. 
// See the link for creating custom partitions for more 
// creation strategies. 
IList<IEnumerator<MyItem>> paritions = partitioner.GetPartitions(
    Environment.ProcessorCount); 

// Create a task for each partition. 
Task[] tasks = partitions.Select(p => Task.Run(() => { 
     // Create the context. 
     using (var ctx = new MyDbContext()) 
     // Remember, the IEnumerator<T> implementation 
     // might implement IDisposable. 
     using (p) 
     // While there are items in p. 
     while (p.MoveNext()) 
     { 
      // Get the current item. 
      MyItem current = p.Current; 

      // Call the stored procedures. Process the item 
      ctx.DoSomething1(current); 
      ctx.DoSomething2(current); 
     } 
    })). 
    // ToArray is needed (or something to materialize the list) to 
    // avoid deferred execution. 
    ToArray(); 
0

それは、もしあれば、内部例外結果が何であるかを知らなくても、このいずれかをトラブルシューティングするために少し難しいです:次のようになります

。これは、接続文字列またはプロバイダ構成が設定される方法に非常に単純に問題があります。

通常、並列コードとEFには注意が必要です。しかし、あなたがやっていることはありますか?私の心の中の1つの質問。そのコンテキストの別のインスタンスで行われている作業はどれですかの前に、のパラレル?あなたの記事によれば、あなたは各スレッドで別々のコンテキストを実行しています。それは良い。しかし、私の一部は、複数のコンテキストの間で起こっている興味深いコンストラクタの競合があるかどうか不思議です。その並列呼び出しの前にどこでもそのコンテキストを使用していない場合は、コンテキストに対して簡単なクエリを実行して開き、並列メソッドを実行する前にすべてのEFビットが起動されていることを確認することをお勧めします。私は認めます、私は試していません正確にあなたがここで何をしたのですが、私は近くして働いています。

+1

を見ているというロックの競合によっても制限されていますが、2012年にこの質問が投稿されたことはわかりますか? – Claies

2

これは私が使用しており、素晴らしい作品です。それは、さらに、エラー例外の処理をサポートし、あなたがDBは、元DbContextとdb.CreateInstance(あるとしてどこ次のようにそれを使用することがはるかに簡単

public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false) 
{ 
    var exceptions = new ConcurrentQueue<Exception>(); 
    if (debugMode) 
    { 
     foreach (var item in items) 
     { 
      try 
      { 
       action(item); 
      } 
      // Store the exception and continue with the loop.      
      catch (Exception e) 
      { 
       exceptions.Enqueue(e); 
      } 
     } 
    } 
    else 
    { 
     var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() => 
     { 
      while (partition.MoveNext()) 
      { 
       try 
       { 
        action(partition.Current); 
       } 
       // Store the exception and continue with the loop.      
       catch (Exception e) 
       { 
        exceptions.Enqueue(e); 
       } 
      } 
     })); 
     Task.WaitAll(partitions.ToArray()); 
    } 
    return exceptions; 
} 

物事を追跡することができ、デバッグモードを持っている)Aを作成します同じ接続文字列を使用する新しいインスタンス。

 var batch = db.Set<SomeListToIterate>().ToList(); 
     var exceptions = batch.Parallel((item) => 
     { 
      using (var batchDb = db.CreateInstance()) 
      { 
       var batchTime = batchDb.GetDBTime(); 
       var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList(); 
       //do stuff to someData 
       item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called 
       batchDb.SaveChanges();   
      }     
     }); 
     if (exceptions.Count > 0) 
     { 
      logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions)); 
      throw new AggregateException(exceptions); //optionally throw an exception 
     } 
     db.SaveChanges(); //save the item modifications 
関連する問題