2012-03-08 8 views
6

私はcorePoolSizemaximumPoolSizeを設定することができるThreadPoolExecutorを探しています。待ち行列はすぐにスレッドプールにタスクを渡して、新しいスレッドを作成してmaximumPoolSizeに達してからキューに追加されます。Java ThreadPoolExecutor戦略、キューを使用した 'Direct Handoff'?

そんなことはありますか?もしそうでなければ、そのような戦略を持っていないのは何らかの正当な理由がありますか?

私が本質的に望むのは、タスクが実行のために提出され、スレッドが多すぎてパフォーマンスが「最悪」になる(maximumPoolSizeを設定する)点に達すると、新しいスレッドの追加が中止されます。そのスレッドプールで作業してキューイングを開始すると、キューがいっぱいになるとそれは拒否されます。

そして、負荷が戻ってくると、未使用のスレッドをcorePoolSizeに戻して解体することができます。

これは「三つの一般的な戦略」http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html

+0

可能な重複:http://stackoverflow.com/questions/1800317/impossible-to-make-a-cached-thread-pool-with- a-size-limit – mnicky

答えて

3

に記載されているノートよりも、自分のアプリケーションでは、私に多くの意味があります:これらの実装はやや不備や非決定的です。このコードを使用する前に、全回答とコメントをお読みください。

エグゼキュータが最大プールサイズを下回っているときにアイテムを拒否する作業キューを作成し、最大に達した後にアイテムを受け入れる方法はありますか?

これは、文書化の挙動に依存しています:

「要求が、その場合には、タスクが を拒否され、この がmaximumPoolSizeを超えない限り、新しいスレッドが作成され、キューに入れることができない場合。」

public class ExecutorTest 
{ 
    private static final int CORE_POOL_SIZE = 2; 
    private static final int MAXIMUM_POOL_SIZE = 4; 
    private static final int KEEP_ALIVE_TIME_MS = 5000; 

    public static void main(String[] args) 
    { 
     final SaturateExecutorBlockingQueue workQueue = 
      new SaturateExecutorBlockingQueue(); 

     final ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(CORE_POOL_SIZE, 
        MAXIMUM_POOL_SIZE, 
        KEEP_ALIVE_TIME_MS, 
        TimeUnit.MILLISECONDS, 
        workQueue); 

     workQueue.setExecutor(executor); 

     for (int i = 0; i < 6; i++) 
     { 
      final int index = i; 
      executor.submit(new Runnable() 
      { 
       public void run() 
       { 
        try 
        { 
         Thread.sleep(1000); 
        } 
        catch (InterruptedException e) 
        { 
         e.printStackTrace(); 
        } 

        System.out.println("Runnable " + index 
          + " on thread: " + Thread.currentThread()); 
       } 
      }); 
     } 
    } 

    public static class SaturateExecutorBlockingQueue 
     extends LinkedBlockingQueue<Runnable> 
    { 
     private ThreadPoolExecutor executor; 

     public void setExecutor(ThreadPoolExecutor executor) 
     { 
      this.executor = executor; 
     } 

     public boolean offer(Runnable e) 
     { 
      if (executor.getPoolSize() < executor.getMaximumPoolSize()) 
      { 
       return false; 
      } 
      return super.offer(e); 
     } 
    } 
} 

注:私はcorePoolSize < maximumPoolSizeで構成ThreadPoolExecutorのデフォルトの動作であることをご希望の動作を期待するのであなたの質問は私を驚かせました。しかし、あなたが指摘しているように、ThreadPoolExecutorのJavaDocは、そうでないことを明確に述べています。


アイデア#2

私はおそらく、少し良いアプローチであるものを持っていると思います。それはThreadPoolExecutorsetCorePoolSizeメソッドにコード化された副作用の振る舞いに依存しています。このアイデアは、作業項目がエンキューされるときに、コア・プール・サイズを一時的かつ条件付きで増加させることです。コアプールのサイズを大きくすると、ThreadPoolExecutorは、キューに入れられたすべての(queue.size())タスクを実行するのに十分な新しいスレッドをすぐに生成します。次に、コア・プール・サイズを即座に減らします。これにより、スレッド・プールは、低アクティビティーの将来の期間中に自然に縮小することができます。このアプローチはまだ完全に決定論的ではありません(プールサイズが最大プールサイズよりも大きくなる可能性があります)が、ほとんどの場合、最初の戦略よりも優れていると思います。

具体的には、私はので、このアプローチは最初のものより優れていると考えている:それはそれは私がしたいレース

  • の結果として実行を拒否しないであろうより頻繁に
  • スレッドを再利用します

    1. 最初のアプローチでは、非常に軽い使用であってもスレッドプールが最大サイズまで拡大することに再度言及します。このアプローチは、その点ではるかに効率的でなければなりません。

    -

    public class ExecutorTest2 
    { 
        private static final int KEEP_ALIVE_TIME_MS = 5000; 
        private static final int CORE_POOL_SIZE = 2; 
        private static final int MAXIMUM_POOL_SIZE = 4; 
    
        public static void main(String[] args) throws InterruptedException 
        { 
         final SaturateExecutorBlockingQueue workQueue = 
          new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, 
            MAXIMUM_POOL_SIZE); 
    
         final ThreadPoolExecutor executor = 
          new ThreadPoolExecutor(CORE_POOL_SIZE, 
            MAXIMUM_POOL_SIZE, 
            KEEP_ALIVE_TIME_MS, 
            TimeUnit.MILLISECONDS, 
            workQueue); 
    
         workQueue.setExecutor(executor); 
    
         for (int i = 0; i < 60; i++) 
         { 
          final int index = i; 
          executor.submit(new Runnable() 
          { 
           public void run() 
           { 
            try 
            { 
             Thread.sleep(1000); 
            } 
            catch (InterruptedException e) 
            { 
             e.printStackTrace(); 
            } 
    
            System.out.println("Runnable " + index 
              + " on thread: " + Thread.currentThread() 
              + " poolSize: " + executor.getPoolSize()); 
           } 
          }); 
         } 
    
         executor.shutdown(); 
    
         executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 
        } 
    
        public static class SaturateExecutorBlockingQueue 
         extends LinkedBlockingQueue<Runnable> 
        { 
         private final int corePoolSize; 
         private final int maximumPoolSize; 
         private ThreadPoolExecutor executor; 
    
         public SaturateExecutorBlockingQueue(int corePoolSize, 
           int maximumPoolSize) 
         { 
          this.corePoolSize = corePoolSize; 
          this.maximumPoolSize = maximumPoolSize; 
         } 
    
         public void setExecutor(ThreadPoolExecutor executor) 
         { 
          this.executor = executor; 
         } 
    
         public boolean offer(Runnable e) 
         { 
          if (super.offer(e) == false) 
          { 
           return false; 
          } 
          // Uncomment one or both of the below lines to increase 
          // the likelyhood of the threadpool reusing an existing thread 
          // vs. spawning a new one. 
          //Thread.yield(); 
          //Thread.sleep(0); 
          int currentPoolSize = executor.getPoolSize(); 
          if (currentPoolSize < maximumPoolSize 
            && currentPoolSize >= corePoolSize) 
          { 
           executor.setCorePoolSize(currentPoolSize + 1); 
           executor.setCorePoolSize(corePoolSize); 
          } 
          return true; 
         } 
        } 
    } 
    
  • +0

    あなたが示唆したことは、私が今削除された答えのコメントで言ったことです。とにかく。私はあなたがオーバーライドしているオファーがロックを持たないので問題を引き起こすかもしれないと思います。たとえそれが3つの命令だとしても?またはそれほど長い。 –

    +0

    @RonaldChan私は同じことを心配していましたが、私は潜在的な問題を理由づけようとしていましたが、私はまだ誰にも打撃を与えていません。私はそれを試し続けていきます。とにかくあなたの答えを受け入れてください。 –

    +0

    これは、私が最後に実装したものです。ThreadPoolExectuorのexecuteメソッドを書き直すのではなく、もっと複雑です。参考までに、maximumPoolSizeにバッファを追加して、レースを可能にする実際のバッファの値よりも5%低くします。理想的な方法ではありませんが、ThreadPoolExecutorのデフォルトの動作が奇妙で、私が記述した動作を持つ外部ThreadPools実装を見つけることができません。もしあれば、私は受け入れられた答えを変更することを検討します。 –

    2

    私たちは、次のコードでその問題への解決策を見つけた:

    をこのキューは、ハイブリッドSynchronousQueue/LinkedBlockingQueueです。

    public class OverflowingSynchronousQueue<E> extends LinkedBlockingQueue<E> { 
        private static final long serialVersionUID = 1L; 
    
        private SynchronousQueue<E> synchronousQueue = new SynchronousQueue<E>(); 
    
        public OverflowingSynchronousQueue() { 
        super(); 
        } 
    
        public OverflowingSynchronousQueue(int capacity) { 
        super(capacity); 
        } 
    
        @Override 
        public boolean offer(E e) { 
        // Create a new thread or wake an idled thread 
        return synchronousQueue.offer(e); 
        } 
    
        public boolean offerToOverflowingQueue(E e) { 
        // Add to queue 
        return super.offer(e); 
        } 
    
        @Override 
        public E take() throws InterruptedException { 
        // Return tasks from queue, if any, without blocking 
        E task = super.poll(); 
        if (task != null) { 
         return task; 
        } else { 
         // Block on the SynchronousQueue take 
         return synchronousQueue.take(); 
        } 
        } 
    
        @Override 
        public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
        // Return tasks from queue, if any, without blocking 
        E task = super.poll(); 
        if (task != null) { 
         return task; 
        } else { 
         // Block on the SynchronousQueue poll 
         return synchronousQueue.poll(timeout, unit); 
        } 
        } 
    

    }

    それが機能するためには、我々はタスクが拒否された場合、「offerToOverflowingQueue」を呼び出すためのRejectedExecutionHandlerをラップする必要があります。我々はThreadPoolExecutorを作成する方法ここで

    public class OverflowingRejectionPolicyAdapter implements RejectedExecutionHandler { 
    
        private OverflowingSynchronousQueue<Runnable> queue; 
        private RejectedExecutionHandler adaptedRejectedExecutionHandler; 
    
        public OverflowingRejectionPolicyAdapter(OverflowingSynchronousQueue<Runnable> queue, 
                  RejectedExecutionHandler adaptedRejectedExecutionHandler) 
        { 
        super(); 
        this.queue = queue; 
        this.adaptedRejectedExecutionHandler = adaptedRejectedExecutionHandler; 
        } 
    
        @Override 
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
        if (!queue.offerToOverflowingQueue(r)) { 
         adaptedRejectedExecutionHandler.rejectedExecution(r, executor); 
        } 
        } 
    } 
    

    public static ExecutorService newSaturatingThreadPool(int corePoolSize, 
                     int maxPoolSize, 
                     int maxQueueSize, 
                     long keepAliveTime, 
                     TimeUnit timeUnit, 
                     String threadNamePrefix, 
                     RejectedExecutionHandler rejectedExecutionHandler) 
        { 
        OverflowingSynchronousQueue<Runnable> queue = new OverflowingSynchronousQueue<Runnable>(maxQueueSize); 
        OverflowingRejectionPolicyAdapter rejectionPolicyAdapter = new OverflowingRejectionPolicyAdapter(queue, 
                                rejectedExecutionHandler); 
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, 
                     maxPoolSize, 
                     keepAliveTime, 
                     timeUnit, 
                     queue, 
                     new NamedThreadFactory(threadNamePrefix), 
                     rejectionPolicyAdapter); 
        return executor; 
    } 
    
    関連する問題