2012-04-10 10 views
8

BoundedExecutorの実装については、Java Concurrency in Practiceの奇妙なことがあります。Java Concurrency in Practice:BoundedExecutorの競合状態ですか?

Executorにキューイングされているか実行中のスレッドが十分にある場合に、サブミットスレッドをブロックすることによってExecutorにタスクの送信を抑制することが想定されています。

これは(catch節で不足している再スローを追加した後)の実装です:

public class BoundedExecutor { 
    private final Executor exec; 
    private final Semaphore semaphore; 

    public BoundedExecutor(Executor exec, int bound) { 
     this.exec = exec; 
     this.semaphore = new Semaphore(bound); 
    } 

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { 
     semaphore.acquire(); 

     try { 
      exec.execute(new Runnable() { 
       @Override public void run() { 
        try { 
         command.run(); 
        } finally { 
         semaphore.release(); 
        } 
       } 
      }); 
     } catch (RejectedExecutionException e) { 
      semaphore.release(); 
      throw e; 
     } 
    } 

私はExecutors.newCachedThreadPool()と4のバウンドでBoundedExecutorをインスタンス化するとき、私はによってインスタンス化スレッドの数を期待しますキャッシュされたスレッドプールは決して4を超えることはありません。しかし、実際にはそうです。私は同じくらい11としてスレッドを作成するには、この小さなテストプログラムを得ている:

public static void main(String[] args) throws Exception { 
    class CountingThreadFactory implements ThreadFactory { 
     int count; 

     @Override public Thread newThread(Runnable r) { 
      ++count; 
      return new Thread(r); 
     }   
    } 

    List<Integer> counts = new ArrayList<Integer>(); 

    for (int n = 0; n < 100; ++n) { 
     CountingThreadFactory countingThreadFactory = new CountingThreadFactory(); 
     ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory); 

     try { 
      BoundedExecutor be = new BoundedExecutor(exec, 4); 

      for (int i = 0; i < 20000; ++i) { 
       be.submitTask(new Runnable() { 
        @Override public void run() {} 
       }); 
      } 
     } finally { 
      exec.shutdown(); 
     } 

     counts.add(countingThreadFactory.count); 
    } 

    System.out.println(Collections.max(counts)); 
} 

私は別のスレッドが許可証をAQUIREできるセマフォの解放とタスクの終了との間に小さな小さな時間枠は、あると思うし、解放スレッドがまだ終了していない間にタスクをサブミットしてください。言い換えれば、それは競合状態にある。

誰かがこれを確認できますか?

+1

私はsemaphore.release()の直後に1msのThread.sleepを追加して、どれくらい悪くなったかを確認しました。私は300以上のスレッドを作成しました。 – toto2

答えて

2

私は一度に9つのスレッドを作成しました。私は、必要以上のスレッドが存在する競合状態があると考えています。

これは、実行するタスク作業の前後にある可能性があります。つまり、コードブロック内にスレッドが4つしかないにもかかわらず、前のタスクを停止しているスレッドや新しいタスクを開始する準備ができているスレッドがいくつかあります。

つまり、スレッドは実行中にrelease()を実行します。その最後のことではなく、新しい仕事を獲得する前に行うことです。

+0

私はあなたの声明に同意します。これは基本的にBossieが最初に疑ったものです。しかし、私はこれが何らかのプログラミングエラーを意味するので、これを競合状態と呼ぶかどうかはわかりません。私は、プログラムが書かれているようにスレッドの最大数を持つとは思わないと思います。 – toto2

+0

@ toto2通常のプログラミングバグではありませんが、リリースされているセマフォとスレッドが新しいタスクを取得する間に競合状態があります。あなたが長生きしていたタスクがめったにこの動作を見ることができない場合。 –

+1

私は仕事の長さについて理解しています(Bossieの質問に対する私のコメントを参照)。私は、バグではないことを意味します。なぜなら、4つのスレッドがあるはずがないからです。 「これはバグではなく、機能です!」この場合、私は誰かが本当にその主張をすることができると思う。申し訳ありません...私はここで哲学的になっています。 – toto2

5

あなたは競合状態の分析に間違いはありません。 ExecutorService &セマフォとの間で同期の保証はありません。

しかし、スレッド数を制限するのがBoundedExecutorが使用されているかどうかはわかりません。私はそれがサービスに提出されたタスクの数を絞るためにより多くのものだと思う。 500万件のタスクを提出する必要があるとしたら、そのうち10,000件以上を提出すると、あなたは不足しているとします。

あなたはいつでも4つのスレッドを実行することができます。なぜ5百万のタスクをすべて試行したいのですか?任意の時点でキューに入れられたタスクの数を調整するために、これに似た構造を使用することができます。あなたがこれから出なければならないのは、いつでも4つのタスクしか実行していないということです。

明らかに、これはExecutors.newFixedThreadPool(4)を使用することです。

+1

正確に。 'command'を実行する基本executorに提出されたタスクは、完了する前にセマフォを解放しますが、これはタスクを実行するexecutorスレッドで発生します。これにより、別のタスクがエグゼキュータに提出され、新しいスレッドに与えられ、理論的には、適切な条件が与えられたときに 'bound 'の2倍のスレッドが許可されます。 –

+1

@ジョン:提出されたタスクの数は抑えられますが、予測不可能で信頼性の低い方法です。スレッドが不運にもスケジュールされていると、多くのサブミットスレッドが潜んでいる可能性があります。また、 'newFixedThreadPool()'の場合、これらのタスクは 'Executor'のアンバウンドキューメモリ不足の危険があります。 @David:4のバウンドの11個のスレッドを取得しました。また、Java並行処理に関するリファレンスブックにも競合条件が残っていると面白いと思います。 –

+0

@Bossieあなたは正しいですが、あなたはまだnewFixedThreadPoolでOOMの疑いがありますが、余分なスレッドが作成されるのを防ぐでしょう。私が言及したように、一度に何百万ものタスクが提出されるのを防ぐために、バインドを使用します。 –

10

BoundedExecutorは、実際にはスレッドのプールサイズにバインドを設定する方法としてではなく、タスクの送信を抑制する方法を示したものです。少なくとも1つのコメントが指摘しているように、後者を達成するためのより直接的な方法があります。

しかし、他の回答がアンバウンド形式のキューを使用して無制限のキューとプールサイズを言及することで

set the bound on the semaphore to be equal to the pool size plus the number of queued tasks you want to allow, since the semaphore is bounding the number of tasks both currently executing and awaiting execution. [JCiP, end of section 8.3.3]

にすると言う本の中のテキストを言及していない、我々は(明らかにない非常に明確に)を意味し、有界サイズのスレッドプールの使用

しかし、私がBoundedExecutorについて悩ましてきたことは、ExecutorServiceインターフェイスを実装していないことです。同様の機能を実現し、標準インターフェースを実装する現代的な方法は、GuavaのlisteningDecoratorメソッドとForwardingListeningExecutorServiceクラスを使用することです。

+0

エキスパートからの回答です。素晴らしいです。物事を少し明確にしてくれてありがとう。それらの 'ListenableFuture'sは面白く見えます。だから、私はいくつかのタスクを提出しなければなりません。そして、完了するたびに、私はコールバックで新しいものを提出します。 –

+0

+1 from the source –

+0

@Bossie - あなたはそうすることができますが、私は、BoundedExecutorServiceのSemaphoreアプローチを引き続き、acquireでコールバックとリリースでコールバックを装飾することで使用できることを意味しました。 –