2009-09-12 26 views
5

私はJavaでタスクのキューを持っています。このキューはDB内のテーブルにあります。M個のスレッド(1タスクにつき1個)を管理してN個のスレッドのみを同時に管理する方法。 N <MでJavaで

私のことを行う必要がありますのみ

  • これ以上のN個のスレッドが同時に実行されているよりもタスクごと

    • 1スレッド。これはスレッドにDBのやり取りがあり、DB接続の束を開けたくないからです。

    私はのような何かを行うことができると思います:

    final Semaphore semaphore = new Semaphore(N); 
    while (isOnJob) { 
        List<JobTask> tasks = getJobTasks(); 
        if (!tasks.isEmpty()) { 
         final CountDownLatch cdl = new CountDownLatch(tasks.size()); 
         for (final JobTask task : tasks) { 
          Thread tr = new Thread(new Runnable() { 
    
           @Override 
           public void run() { 
            semaphore.acquire(); 
            task.doWork(); 
            semaphore.release(); 
            cdl.countDown(); 
           } 
    
          }); 
         } 
         cdl.await(); 
        } 
    } 
    

    私はExecutorServiceのクラスが存在することを知っているが、私はそれは私がこれのためにそれを使用できるかどうかわからないが。

    だから、これがこれを行う最善の方法だと思いますか?または、これを解決するためにExecutorServiceがどのように動作するかを明確にすることはできますか?

    最終的な解決策:awnsersため

    while (isOnJob) { 
        ExecutorService executor = Executors.newFixedThreadPool(N); 
        List<JobTask> tasks = getJobTasks(); 
        if (!tasks.isEmpty()) { 
         for (final JobTask task : tasks) { 
          executor.submit(new Runnable() { 
    
           @Override 
           public void run() { 
            task.doWork(); 
           } 
    
          }); 
         } 
        } 
        executor.shutdown(); 
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS); 
    } 
    

    おかげでたくさん:

    私は最善の解決策のようなものだと思います。 BTW私は接続プールを使用していますが、DBへのクエリは非常に重く、私は同時に管理されていない数のタスクを持ちたくありません。

  • 答えて

    7

    実際にはExecutorServiceを使用できます。たとえば、newFixedThreadPoolメソッドを使用して新しい固定スレッドプールを作成します。この方法では、スレッドのキャッシング以外に、nスレッドが同時に実行されていることを保証します。これらの線に沿って

    何か:あなたは、もはや必要に応じて、完了するために、計算のを待ちますgetとして、ラッチを使用する必要はありませ

    private static final ExecutorService executor = Executors.newFixedThreadPool(N); 
    // ... 
    while (isOnJob) { 
        List<JobTask> tasks = getJobTasks(); 
        if (!tasks.isEmpty()) { 
         List<Future<?>> futures = new ArrayList<Future<?>>(); 
         for (final JobTask task : tasks) { 
           Future<?> future = executor.submit(new Runnable() {  
             @Override 
             public void run() { 
               task.doWork(); 
             } 
           }); 
           futures.add(future); 
         } 
         // you no longer need to use await 
         for (Future<?> fut : futures) { 
          fut.get(); 
         } 
        } 
    } 
    

    注意。

    +0

    だから私はセマフォーも必要ないと思われますか? – user2427

    +0

    はい、削除することもできます。 –

    0

    パフォーマンスの向上は、スレッドで実行する必要がある作業の種類によっても異なります。 DBが処理のボトルネックになっている場合は、スレッドがDBにアクセスする方法に注意を払うことになります。接続プールを使用するのはおそらく順番です。これは、ワーカースレッドがプールからDB接続を再利用できるので、より多くのスループットを達成するのに役立ちます。

    4

    私はJGに同意します。ExecutorServiceは行く方法です...しかし、あなたは両方ともそれが必要以上に複雑になっていると思います。

    むしろ理由だけではなく(Executors.newFixedThreadPool(N)で)固定サイズのスレッドプールを作成し、そこにすべてのタスクを提出スレッド(タスクあたり1)の大規模な数を作成するよりも?セマフォなどの必要はありません。スレッドプールに取得したジョブをサブミットし、スレッドプールはまで最大で個のスレッドで処理します。

    一度にN個以上のスレッドを使用しない場合、なぜそれらを作成したいですか?

    1

    ThreadPoolExecutorインスタンスには、スレッドのアンバウンドキューと固定最大サイズがあります。 Executors.newFixedThreadPool(N)。これは多数のタスクを受け入れますが、同時に実行するのはNです。代わりに

    あなたは(Nの容量を持つ)の代わりに有界キューを選択した場合エグゼキュータは、タスクの実行を拒否します(ポリシーにどのように依存するかを正確に直接ThreadPoolExecutorで作業するときに設定することができ、 エグゼキュータを工場で使用する - RejectedExecutionHandlerを参照してください。

    あなたはNの容量を持つセットアップバウンドBlockingQueueのをすべき「本当の」輻輳制御が必要な場合。データベースから実行したいタスクを取り出し、キューに入れてください。もしフルであれば、呼び出しスレッドがブロックされます。別のスレッドでは、BlockingQueueのからタスクを取るとエグゼキュータに提出(おそらくもキュータ APIを使用して開始)。 BlockingQueueが空の場合、呼び出しスレッドもブロックされます。完了したことを通知するには、「特別な」オブジェクト(キューの最後/最後のアイテムをマークするシングルトンなど)を使用します。

    関連する問題