2017-05-12 11 views
0

私はCreateメソッドを持っています。このメソッドは、サービスバスのメッセージキュー(https://azure.microsoft.com/en-us/services/service-bus/)に新しいメッセージが表示されたときに実行されます。
Createの5つのタスクに対するすべてのコールに対して、並行して実行できる並行タスクの総数を制限しようとしています。
私のコードでは、Parallel.ForEachは何もしないようです。実行中の同時並行タスクの制限

mutex.WaitOne(); 
if(curretNumTasks < MaxTasks) 
{ 
    tasks.Add(makePdfAsync(form)); 
} 
mutex.ReleaseMutex(); 

を、それは非常に遅く、サービスバススローを行います

私はこのようなmakePdfAsync()呼び出しの周りにミューテックス/ロックを追加しようとしました。

Createのすべての呼び出しで作成される並行タスクの数を制限する方法を教えてください。

public async Task Create(List<FormModel> forms) 
{ 
    var tasks = new List<Task>(); 

    Parallel.ForEach(forms, new ParallelOptions { MaxDegreeOfParallelism = 5 }, form => 
    { 
     tasks.Add(makePdfAsync(form)); 
    }); 
    await Task.WhenAny(Task.WhenAll(tasks), Task.Delay(TimeSpan.FromMinutes(10))); 
} 

public async Task makePdfAsync() 
{ 
    var message = new PdfMessageModel(); 
    message.forms = new List<FormModel>() { form }; 

    var retry = 10; 
    var uri = new Uri("http://localhost.:8007"); 
    var json = JsonConvert.SerializeObject(message); 

    using (var wc = new WebClient()) 
    { 
     wc.Encoding = System.Text.Encoding.UTF8; 

     // reconnect with delay in case process is not ready 
     while (true) 
     { 
      try 
      { 
       await wc.UploadStringTaskAsync(uri, json); 
       break; 
      } 
      catch 
      { 
       if (retry-- == 0) throw; 
      } 
     } 
    } 
} 

TL; DR。 Createは、クラスのメソッドであり、同時に多くのインスタンスで呼び出されます。同時実行性は2倍です。 Createの複数回の呼び出しと同時に、Createの各呼び出しの中で、いくつかのタスクが同時に実行されます。
どのようにして1つの時点で実行されているタスクの総数を制限できますか?

+0

'Task.WhenAll(tasks)'に何らかの影響を与えます。したがって、 'Parallel.ForEach'はこのコードでは使用しません。 –

+0

これは2回目の質問ですが、答えは同じです: 'SemaphoreSlim'を使用してください。何らかの理由でこれがあなたのシナリオに当てはまらない場合は、あなたの質問を更新して、なぜこれが当てはまるのかを明確にしてください。 –

答えて

0

システム全体のセマフォを使用して見ることができますか?例えば

:私が正しくあなたを理解

var throttle = new Semaphore(5,5,"pdftaskthrottle"); 

if (throttle.WaitOne(5000)){ 
    try{ 
     //do some task/thread stuff 
     ..... 
    } catch(Exception ex){ 
     // handle 
    } finally { 
     //always remember to release the semaphore 
     throttle.Release(); 
    } 
} else { 
    //we timed out ... try again? 
} 
0

場合は、効果的に5つのタスクの上限とプロデューサ/コンシューマキューをしたいです。それがあなたの後であればBlockingCollectionが最高です。それは内部的に非常に良い性能を持っており、必要に応じてブロックするためにSemaphoreSlimを使用しています。また、Taskを一緒に活用することもできます。 BlockingCollection<Task<T>>を作成します。 "C#を要約すると"これの良いセクションがあります。以下のコードを一般的な例として参照してください。また、可能であれば、mutexのようなカーネルモードの同期構造を使用しないようにしてください(管理コードからネイティブコードに移行するために支払う必要があります)。あなただけの `` MaxDegreeOfParallelism = 5`にはないすべてのタスクが `Task.WhenAll(タスク)で実行されている..あなたはparallel.foreachを経由して、それを実行していない、タスクリストを平行に構築している

 class PCQueue : IDisposable 
     { 
      private BlockingCollection<Task> _taskQueue = new BlockingCollection<Task>(); 
      public PCQueue(int workerCount) 
      { 
       for (int i = 0; i < workerCount; i++) 
        Task.Factory.StartNew(Consume); 
      } 

      public Task Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken)) 
      { 
       //! A task object can either be generated using TaskCompletionSource or instantiated directly (an unstarted or cold task!). 
       var task = new Task(action, cancelToken); 
       _taskQueue.Add(task); //? Create a cold task and enqueue it. 
       return task; 
      } 

      public Task<TResult> Enqueue<TResult>(Func<TResult> func, CancellationToken cancelToken = default(CancellationToken)) 
      { 
       var task = new Task<TResult>(func, cancelToken); 
       _taskQueue.Add(task); 
       return task; 
      } 

      void Consume() 
      { 
       foreach (var task in _taskQueue.GetConsumingEnumerable()) 
       { 
        try 
        { 
         //! We run the task synchronously on the consumer's thread. 
         if (!task.IsCanceled) task.RunSynchronously(); 
        } 
        catch (InvalidOperationException) 
        { 
         //! Handle the unlikely event that the task is canceled in between checking whether it's canceled and running it. 
         // race condition! 
        } 
       } 
      } 

      public void Dispose() => _taskQueue.CompleteAdding(); 
     } 
+0

上限を指定していません – MickyD

関連する問題