2016-09-28 5 views
0

サードパーティライブラリを使用して何千ものURLを実行するコードがあります。場合によっては、ライブラリ内のメソッドがハングし、スレッドを占有することがあります。しばらくすると、すべてのスレッドは何もしないプロセスによって取り込まれ、停止するまで粉砕されます。SemaphoreSlimを使用しているときにハングするスレッドを処理する方法

新しいスレッドの追加を制御するためにSemaphoreSlimを使用しているため、最適な数のタスクを実行できます。あまりにも長く実行されているタスクを特定し、それらを強制終了する方法が必要ですが、SemaphoreSlimからスレッドを解放して、新しいタスクを作成することもできます。

私はこのアプローチに苦労しているので、私がやっていることを真似るテストコードを作った。これは、すべてのスレッドがハングアップしてしまうように、ハングアップの確率が10%のタスクを作成します。

どうすればこれらをチェックし、それらを殺すべきですか?ここで

はコードです:

class Program 
{ 
    public static SemaphoreSlim semaphore; 
    public static List<Task> taskList; 
    static void Main(string[] args) 
    { 

     List<string> urlList = new List<string>(); 
     Console.WriteLine("Generating list"); 
     for (int i = 0; i < 1000; i++) 
     { 
      //adding random strings to simulate a large list of URLs to process 
      urlList.Add(Path.GetRandomFileName()); 
     } 
     Console.WriteLine("Queueing tasks"); 

     semaphore = new SemaphoreSlim(10, 10); 

     Task.Run(() => QueueTasks(urlList)); 

     Console.ReadLine(); 
    } 
    static void QueueTasks(List<string> urlList) 
    { 
     taskList = new List<Task>(); 

     foreach (var url in urlList) 
     { 
      Console.WriteLine("{0} tasks can enter the semaphore.", 
        semaphore.CurrentCount); 
      semaphore.Wait(); 

      taskList.Add(DoTheThing(url)); 
     } 
    } 
    static async Task DoTheThing(string url) 
    { 

     Random rand = new Random(); 

     // simulate the IO process 
     await Task.Delay(rand.Next(2000, 10000)); 

     // add a 10% chance that the thread will hang simulating what happens occasionally with http request 
     int chance = rand.Next(1, 100); 
     if (chance <= 10) 
     { 
      while (true) 
      { 
       await Task.Delay(1000000); 
      } 
     } 

     semaphore.Release(); 
     Console.WriteLine(url); 
    } 
} 
+0

最初はタスクを強制終了しないでください。いくつかの協調的なメカニズムを使用して操作を中断し、それが担当するセマフォを解放することができます。残念ながら、あなたの質問には、あなたのタスクの実装そのものを理解するために必要なすべての詳細が欠けていますが、一般的にはネットワークI/O(HTTPリクエストなど)を扱うときには、アクティブなスレッドを持つべきではありません。実行している操作のための.NET非同期API –

+0

「どうすれば...殺すべき?」 - あなたはすべきではありません - スレッドを殺すことは悪い悪いです。サードパーティのライブラリがクラッシュしている場合は、別の 'AppDomain'でライブラリを実行して、きれいに閉じることができます。 – Enigmativity

+1

完全分離のためには、AppDomainだけでなく、別の*プロセス*が必要である点を除いて、@Enigmativityに同意します。 –

答えて

0

人がすでに指摘したように、一般的にスレッドを中止することは悪いとC#でそれをやってのない確実な方法はありません。別のプロセスを使って作業をしてからそれを終了させるのは、Thread.Abortを試みるよりも少し良いアイデアです。まだ最良の方法はありません。理想的には、協調スレッド/プロセスが必要で、IPCを使用して自分自身を救済する時期を決定します。このようにして、クリーンアップは適切に行われます。

これまで述べてきたことだけで、あなたがやろうとしていることを以下のようなコードで行うことができます。私はあなたの仕事がスレッドで行われることを前提に書きました。わずかな変更を加えて、同じロジックを使用してプロセス内でタスクを実行することができます

コードは絶対的な意味ではありません。並行コードは実際によくテストされていません。ロックは必要以上に長く保持され、ロックされていない場所(ログ機能など)

class TaskInfo { 
    public Thread Task; 
    public DateTime StartTime; 

    public TaskInfo(ParameterizedThreadStart startInfo, object startArg) { 
     Task = new Thread(startInfo); 
     Task.Start(startArg); 
     StartTime = DateTime.Now; 
    } 

} 

class Program { 

    const int MAX_THREADS = 1; 
    const int TASK_TIMEOUT = 6; // in seconds 
    const int CLEANUP_INTERVAL = TASK_TIMEOUT; // in seconds 

    public static SemaphoreSlim semaphore; 

    public static List<TaskInfo> TaskList; 
    public static object TaskListLock = new object(); 

    public static Timer CleanupTimer; 

    static void Main(string[] args) { 
     List<string> urlList = new List<string>(); 
     Log("Generating list"); 
     for (int i = 0; i < 2; i++) { 
      //adding random strings to simulate a large list of URLs to process 
      urlList.Add(Path.GetRandomFileName()); 
     } 
     Log("Queueing tasks"); 

     semaphore = new SemaphoreSlim(MAX_THREADS, MAX_THREADS); 

     Task.Run(() => QueueTasks(urlList)); 

     CleanupTimer = new Timer(CleanupTasks, null, CLEANUP_INTERVAL * 1000, CLEANUP_INTERVAL * 1000); 


     Console.ReadLine(); 
    } 

    // TODO: Guard against re-entrancy 
    static void CleanupTasks(object state) { 
     Log("CleanupTasks started"); 

     lock (TaskListLock) { 
      var now = DateTime.Now; 
      int n = TaskList.Count; 
      for (int i = n - 1; i >= 0; --i) { 
       var task = TaskList[i]; 
       Log($"Checking task with ID {task.Task.ManagedThreadId}"); 

       // kill processes running for longer than anticipated 
       if (task.Task.IsAlive && now.Subtract(task.StartTime).TotalSeconds >= TASK_TIMEOUT) { 
        Log("Cleaning up hung task"); 
        task.Task.Abort(); 
       } 

       // remove task if it is not alive 
       if (!task.Task.IsAlive) { 
        Log("Removing dead task from list"); 
        TaskList.RemoveAt(i); 
        continue; 
       } 

      } 

      if (TaskList.Count == 0) { 
       Log("Disposing cleanup thread"); 
       CleanupTimer.Dispose(); 
      } 
     } 

     Log("CleanupTasks done"); 
    } 

    static void QueueTasks(List<string> urlList) { 
     TaskList = new List<TaskInfo>(); 

     foreach (var url in urlList) { 
      Log($"Trying to schedule url = {url}"); 
      semaphore.Wait(); 
      Log("Semaphore acquired"); 

      ParameterizedThreadStart taskRoutine = obj => { 
       try { 
        DoTheThing((string)obj); 
       } finally { 
        Log("Releasing semaphore"); 
        semaphore.Release(); 
       } 
      }; 

      var task = new TaskInfo(taskRoutine, url); 
      lock (TaskListLock) 
       TaskList.Add(task); 
     } 

     Log("All tasks queued"); 
    } 

    // simulate all processes get hung 
    static void DoTheThing(string url) { 
     while (true) 
      Thread.Sleep(5000); 
    } 

    static void Log(string msg) { 
     Console.WriteLine("{0:HH:mm:ss.fff} Thread {1,2} {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId.ToString(), msg); 
    } 
} 
関連する問題