2017-11-21 15 views
5

PLYNQの専門家からの助けに感謝します。私は答えを見直す時間がかかるでしょう、私はmath.SEのより確立されたプロフィールを持っています。ParallelQuery.Aggregateが並行して実行されない可能性がある理由

私はタイプParallelQuery<List<string>>のオブジェクトを持っています。これには、並行して処理したい44個のリスト(一度に5個)があります。 私のプロセスは、以下のように、ブール値のペアである結果を返され

private ProcessResult Process(List<string> input) 

処理のような署名を有します。

private struct ProcessResult 
    { 
     public ProcessResult(bool initialised, bool successful) 
     { 
      ProcessInitialised = initialised; 
      ProcessSuccessful = successful; 
     } 

     public bool ProcessInitialised { get; } 
     public bool ProcessSuccessful { get; } 
    } 

問題です。IEnumerable<List<string>> processMeを指定すると、私のPLYNQクエリはこのメソッドを実装しようとします:https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx

processMe.AsParallel() 
     .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult> 
      (
       new ConcurrentStack<ProcessResult>, //aggregator seed 
       (agg, input) => 
       {       //updating the aggregate result 
        var res = Process(input); 
        agg.Push(res); 
        return agg; 
       }, 
       agg => 
       {       //obtain the result from the aggregator agg 
        ProcessResult res; // (in this case just the most recent result**) 
        agg.TryPop(out res); 
        return res; 
       } 
      ); 

残念ながら、それは並行して実行されず、順番にしか実行されません。 (**私はちょうど今の仕事に並列化を取得しようとしています、この実装は行わないという「感覚」を注意してください。)私は、並列に走行を行った若干異なる実装を、試してみました


しかし、凝集はなかった。私は凝集法(本質的には、ProcessResultの両方の部分、すなわち凝集体([A1、A2]、[B1、B2])≡[A1 & & B1、A2 & & B2]の両方のブールANDである)を定義した。

private static ProcessResult AggregateProcessResults 
     (ProcessResult aggregate, ProcessResult latest) 
    { 
     bool ini = false, suc = false; 
     if (aggregate.ProcessInitialised && latest.ProcessInitialised) 
      ini = true; 
     if (aggregate.ProcessSuccessful && latest.ProcessSuccessful) 
      suc = true; 


     return new ProcessResult(ini, suc); 
    } 

そしてここPLYNQクエリhttps://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true), 
    (res, input) => Process(input), 
    (agg, latest) => AggregateProcessResults(agg, latest), 
    agg   => agg 

問題がAggregateProcessResultsコードが何らかの理由-Iは、結果が行っていた場所無知だ...

ため、ヒットはなかったということでした使用

読んでいただきありがとうございます、どんなヘルプをいただきありがとうございます:)

+0

代わりに、別のオーバーロードを使用します。あなたがしようとしている作業のための正しい操作を使用すると、システムがはるかに効果的にそれを達成できることがわかります。 – Servy

+0

あなたのコレクションにはいくつのアイテムがありますか? (わずか44?)あなたはいくつのCPUコアを持っていますか?複数のTreadおよび複数のCPUコアでクエリを実行するには、複雑な準備が必要です。コレクションは、使用可能なCPUコア数と同じ数の部分に分割し、スレッドでタスクを実行し、最後に結果を集計する必要があります。だから、.NETは、すべての作業をずっと遅くするために多くの作業をしないほどスマートです... – Major

+0

@メジャー私は22000文字列を持っています。これは500にバッチ処理され、44リストを与えます。私は同時に5つのプロセスを実行することに限定されています – Szmagpie

答えて

2

Aggregateのオーバーロードは本当にpで実行されません並行して、設計によって。シードを渡してからステップ関数を実行しますが、ステップ関数(agg)への引数は以前のステップから受け取ったアキュムレータです。そのため、本質的にシーケンシャル(前のステップの結果は次のステップへの入力)であり、並列化はできません。なぜこの過負荷がParallelEnumerableに含まれているのか分かりませんが、おそらく理由がありました。あなたは順番に各項目の新しい値を計算したい場合は、 `SELECT`、ない` Aggregate`を使用する必要があります

var result = processMe 
.AsParallel() 
.Aggregate 
(
    // seed factory. Each partition will call this to get its own seed 
    () => new ConcurrentStack<ProcessResult>(), 
    // process element and update accumulator 
    (agg, input) => 
    {           
     var res = Process(input); 
     agg.Push(res); 
     return agg; 
    }, 
    // combine accumulators from different partitions 
    (agg1, agg2) => { 
     agg1.PushRange(agg2.ToArray()); 
     return agg1; 
    }, 
    // reduce 
    agg => 
    { 
     ProcessResult res; 
     agg.TryPop(out res); 
     return res; 
    } 
); 
関連する問題