2017-10-16 12 views
5

Parallel.ForEachを使用してさまざまなサイズのPDFを処理しています(数百MBの単純な2MBから高いDPIスキャンまで)、時折OutOfMemoryExceptionになります。プロセスは32ビットであり、Parallel.ForEachによって生成されたスレッドは、未知の量のメモリを消費します。Parallel.ForEachでカスタムTaskSchedulerを使用してOutOfMemoryExceptionを防止する

MaxDegreeOfParallelismを制限すると動作しますが、小さいPDFファイルの大きな(10k +)バッチがあるときのスループットでは、前記スレッドのメモリフットプリントが小さいために多くのスレッドが動作する可能性があるため十分ではありません。これはParallel.ForEachがCPUの100%CPUに簡単に到達し、大規模なPDFのグループに当たってOutOfMemoryExceptionを取得する前に、重いプロセスです。パフォーマンスプロファイラを実行すると、これがサポートされます。

Parallel.ForEach用のパーティショナーを使用してもパフォーマンスは向上しません。

これは、MemoryFailPoint小切手で私のParallel.ForEachに渡されたカスタムTaskSchedulerを使用しています。その周辺を検索すると、カスタムのTaskSchedulerオブジェクトの作成に関する不十分な情報があるようです。 StackOverflowの上ここA custom TaskScheduler in C#Specialized Task Schedulers in .NET 4 Parallel Extensions Extrasの間、様々な答えを探し

は、私のような私のTaskSchedulerを自身と持って、私のQueueTaskメソッドを作成しました:

protected override void QueueTask(Task task) 
{ 
    lock (tasks) tasks.AddLast(task); 
    try 
    { 
     using (MemoryFailPoint memFailPoint = new MemoryFailPoint(600)) 
     { 
      if (runningOrQueuedCount < maxDegreeOfParallelism) 
      { 
       runningOrQueuedCount++; 
       RunTasks(); 
      } 
     } 
    } 
    catch (InsufficientMemoryException e) 
    {  
     // somehow return thread to pool?   
     Console.WriteLine("InsufficientMemoryException"); 
    } 
} 

のtry/catchは少し高価な私の目標はここですが600MBの可能な最大サイズのPDF(+余分なメモリオーバヘッド)がOutOfMemoryExceptionをスローするときに捕捉することです。この解決策は、InsufficientMemoryExceptionを捕まえたときに作業を試みているスレッドを殺しているようです。十分な大きさのPDFがあれば、私のコードは1つのスレッドParallel.ForEachになります。

、次のいずれか

  • MemoryFailPointチェックで拒否されたスレッドをスレッドプールに戻すにはどうすればよいですか?
  • 空きメモリがある場合、どのように新しいスレッドを安全に作成して作業を再開するのですか?

編集: ディスク上のPDFのサイズが直線によるPDFコンテンツに依存してラスタライズし、ラスタライズ画像操作コンポーネントにメモリ内のサイズを表していてもよいです。

答えて

0

LimitedConcurrencyLevelTaskSchedulerからSamples for Parallel Programming with the .NET Frameworkまで私はマイナーな調整を行って、私が望むものを探していました。

private void NotifyThreadPoolOfPendingWork() 
{ 
    ThreadPool.UnsafeQueueUserWorkItem(_ => 
    { 
     // Note that the current thread is now processing work items. 
     // This is necessary to enable inlining of tasks into this thread. 
     _currentThreadIsProcessingItems = true; 
     try 
     { 
      // Process all available items in the queue. 
      while (true) 
      { 
       Task item; 
       lock (_tasks) 
       { 
        // When there are no more items to be processed, 
        // note that we're done processing, and get out. 
        if (_tasks.Count == 0) 
        { 
         --_delegatesQueuedOrRunning; 
         break; 
        } 

        // Get the next item from the queue 
        item = _tasks.First.Value; 
        _tasks.RemoveFirst(); 
       } 

       // Execute the task we pulled out of the queue 
       //base.TryExecuteTask(item); 

       try 
       { 
        using (MemoryFailPoint memFailPoint = new MemoryFailPoint(650)) 
        { 
         base.TryExecuteTask(item); 
        } 
       } 
       catch (InsufficientMemoryException e) 
       { 
        Thread.Sleep(500); 

        lock (_tasks) 
        { 
         _tasks.AddLast(item); 
        } 
       } 

      } 
     } 
     // We're done processing items on the current thread 
     finally { _currentThreadIsProcessingItems = false; } 
    }, null); 
} 

我々はキャッチを見てみましょうが、逆に:以下はNotifyThreadPoolOfPendingWork変更後LimitedConcurrencyLevelTaskSchedulerクラスのメソッドです。私たちは、作業を開始するタスクをタスクリスト(_tasks)に追加します。このタスクは、使用可能なスレッドがその作業を実行するためのイベントをトリガーします。しかし、私たちは最初に現在のスレッドをスリープ状態にして、直ちに作業を取り上げずに、失敗したMemoryFailPointチェックに戻ります。

関連する問題