2011-12-19 4 views
6

私はParallel.ForEachを使用しているコードを持っています。これはおそらく古いバージョンのRx拡張機能やTasks Parallel Libraryに基づいています。 Rx拡張機能の現在のバージョンをインストールしましたが、Parallel.ForEachが見つかりません。私は、ライブラリの他の派手なものを使用して、ちょうどこのような並列にいくつかのデータを処理したくないよ:Rx拡張機能:Parallel.ForEachはどこですか?

Parallel.ForEach(records, ProcessRecord); 

私はthis questionを見つけたが、私は受信の古いバージョンに依存したくありません。しかし、私はRxに似た何かを見つけることができませんでした。だから、現在のRxバージョンを使用してそれを行うための現在の最も直接的な方法は何ですか?プロジェクトは.NET 3.5を使用しています。

+2

.NET 4.0にアップグレードしますか? – dtb

+0

[Reactive Extensions from .Net 3.5のParallel.ForEachが存在しない可能性があります](http://stackoverflow.com/questions/7398962/parallel-foreach-missing-from-reactive-extensions-for-net-3-5) –

+1

@ハサン:あなたが言及した質問にリンクしているので、私はそれを明らかに知っていました。しかし、答えは私が使用したくない古いRxバージョンを使用することを提案しています。 – Achim

答えて

24

あなたが受信している場合、すべてのこの愚かなgooseryを行う必要がありません:

records.ToObservable() 
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) 
    .ToList() 
    .First(); 

(または、あなたがしたい場合は項目の順序がで維持が効率のコスト):

records.ToObservable() 
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) 
    .Concat() 
    .ToList() 
    .First(); 

または同時にいくつのアイテムを制限したい場合:

records.ToObservable() 
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))) 
    .Merge(5 /* at a time */) 
    .ToList() 
    .First(); 
+0

これはいつでもn個のアイテムのみを処理するように調整しますか?私は何百ものアイテムを使ってこれを試しています。それを一気にやろうとしています。これは私のアプリに大きなメモリ問題を引き起こしています。私はそれを一度に5回することに制限することができるようにしたいと思います。 – Clint

+2

制限付き同時性を含むように更新されました –

1

ここで簡単に交換です:

class Parallel 
{ 
    public static void ForEach<T>(IEnumerable<T> source, Action<T> body) 
    { 
     if (source == null) 
     { 
      throw new ArgumentNullException("source"); 
     } 
     if (body == null) 
     { 
      throw new ArgumentNullException("body"); 
     } 
     var items = new List<T>(source); 
     var countdown = new CountdownEvent(items.Count); 
     WaitCallback callback = state => 
     { 
      try 
      { 
       body((T)state); 
      } 
      finally 
      { 
       countdown.Signal(); 
      } 
     }; 
     foreach (var item in items) 
     { 
      ThreadPool.QueueUserWorkItem(callback, item); 
     } 
     countdown.Wait(); 
    } 
} 
+0

これは確かですが、すべてのアイテムが処理されたときにコードはどのように知っていますか? Parallel.ForEachはすべてのアイテムが処理されるまでブロックします。そのため気にする必要はありません。 – Achim

+1

明らかに、Parallel.ForEachの機能が単純化されています。私はすべてのアイテムが処理されるまでブロックする答えを広げました。 – dtb

+2

ここでのトレードオフは、4.0へのアップグレードと比較して検討する価値があります。 'Parallel.ForEach'の著者は並行性の程度とオーバーヘッド量のバランスを取るために多くの作業を行いました。一方、@ dtbは約30LoCを要する良い答えを出しました。あなたがこの答えがあなたがうまくいく必要があれば、そして幸せな日を見つければ。あなた自身が微調整や調整をしていて、何百ものLoCに成長しているコードを見つけたら、良いショートバージョンはなくなりました。しかし、悪いロングバージョンでは、あなたが本質的に4.0 'パラレル 'テストしました。 –

関連する問題