2011-11-02 11 views
7

私は、固定スレッドプールExecutorServiceの幅が10であり、リストは100 Callableであり、それぞれ20秒待って割り込みを記録しています。Java ExecutorService invokeAll()interrupting

私は別のスレッドで、そのリストにinvokeAllを呼び出し、そしてほとんどすぐにこのスレッドを中断しています。予想通りExecutorService実行が中断されますが、Callable秒で記録された割り込みの実際の数は、10予想よりはるかに多い - 周りの20-40を。 ExecutorServiceが同時に10個以上のスレッドを実行できない場合はどうしてですか?

完全なソース:(あなたが原因同時実行にもう一度それを、それを実行する必要があります)

WinXPの32ビットを使用して
@Test 
public void interrupt3() throws Exception{ 
    int callableNum = 100; 
    int executorThreadNum = 10; 
    final AtomicInteger interruptCounter = new AtomicInteger(0); 
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum); 
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>(); 
    for (int i = 0; i < callableNum; ++i) { 
     executeds.add(new Waiter(interruptCounter)); 
    } 
    Thread watcher = new Thread(new Runnable() { 

     @Override 
     public void run(){ 
      try { 
       executorService.invokeAll(executeds); 
      } catch(InterruptedException ex) { 
       // NOOP 
      } 
     } 
    }); 
    watcher.start(); 
    Thread.sleep(200); 
    watcher.interrupt(); 
    Thread.sleep(200); 
    assertEquals(10, interruptCounter.get()); 
} 

// This class just waits for 20 seconds, recording it's interrupts 
private class Waiter implements Callable <Object> { 
    private AtomicInteger interruptCounter; 

    public Waiter(AtomicInteger interruptCounter){ 
     this.interruptCounter = interruptCounter; 
    } 

    @Override 
    public Object call() throws Exception{ 
     try { 
      Thread.sleep(20000); 
     } catch(InterruptedException ex) { 
      interruptCounter.getAndIncrement(); 
     } 
     return null; 
    } 
} 

、オラクルJRE 1.6.0_27とJUnit4

+0

Hmm ...メインメソッドを使ってプログラムに変換すると、私は常に10を得ています...(WindowsのJava 7) –

+0

同じことを行って、37(1.6.0_27、Windows XP)を取得しました。テストするJava 7を持っていない、誰かが確認できますか? –

+0

私は仕事に挑戦します。多分Java 6のバグです... –

答えて

4

私は仮説に反対10回の割り込みだけを受け取るようにしてください。

Assume the CPU has 1 core. 
1. Main thread starts Watcher and sleeps 
2. Watcher starts and adds 100 Waiters then blocks 
3. Waiter 1-10 start and sleep in sequence 
4. Main wakes and interrupts Watcher then sleeps 
5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts) 
6. Waiter 11-13 start and sleep 
7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts) 
8. Waiter 14-20 are "started" resulting in a no-op 
9. Waiter 21-24 start and sleep 
.... 

基本的に、私の議論は、それがタイムスライスをもたらすとExecutorServiceののワーカースレッドがより開始することを可能にする必要があります前に、ウォッチャーのスレッドはすべて100「ウェイター」RunnableFutureインスタンスをキャンセルするために許可されるという保証がないことですウェイターの仕事。

更新:AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    if (tasks == null) 
     throw new NullPointerException(); 
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
    boolean done = false; 
    try { 
     for (Callable<T> t : tasks) { 
      RunnableFuture<T> f = newTaskFor(t); 
      futures.add(f); 
      execute(f); 
     } 
     for (Future<T> f : futures) { 
      if (!f.isDone()) { 
       try { 
        f.get(); //If interrupted, this is where the InterruptedException will be thrown from 
       } catch (CancellationException ignore) { 
       } catch (ExecutionException ignore) { 
       } 
      } 
     } 
     done = true; 
     return futures; 
    } finally { 
     if (!done) 
      for (Future<T> f : futures) 
       f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads 
    } 
} 

からコードを表示f.cancel(true)が含まれていますfinallyブロックは、割り込みが現在実行中のタスクに伝播するだろうというときです。あなたが見ることができるように、これはタイトなループですが、ループを実行するスレッドは1つのタイムスライスでFutureのすべてのインスタンスを反復処理することができるだろう保証はありません。

+0

だから、あなたは、のその中断を言う 'invokeAll()'すべてのキューに入れられたタスクの即時取り消しを意味するものではありませんものを実行しているの中断前?私にとっては、少なくとも驚きの原則の直接の破損のように見えます。 –

+1

修正。タスクを処理するワーカースレッドは、 'invokeAll()'を実行するスレッドとは異なります。あるスレッドで割り込みを呼び出しても他のスレッドは中断されているわけではないので、時には10スレッド以上の割り込みを受け取ることができて驚いたことはありません。私が投稿した注釈付きコードで言及したように、割り込みは 'Future.cancel'メソッドに渡されたブール型引数の長所としてタスクを処理するワーカースレッドにのみ送られます。 –

0
PowerMock.mockStatic (Executors.class); 
EasyMock.expect (Executors.newFixedThreadPool (9)).andReturn (executorService); 

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock (Future.class); 
EasyMock.expect (callableMock.get (EasyMock.anyLong(), EasyMock.isA (TimeUnit.class))).andReturn (ccs).anyTimes(); 

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>>(); 
futures.add (callableMock); 
EasyMock.expect (executorService.invokeAll (EasyMock.isA (List.class))).andReturn (futures).anyTimes(); 

executorService.shutdown(); 
EasyMock.expectLastCall().anyTimes(); 

EasyMock.expect (mock.getMethodCall ()).andReturn (result).anyTimes(); 

PowerMock.replayAll(); 
EasyMock.replay (callableMock, executorService, mock); 

Assert.assertEquals (" ", answer.get (0)); 
PowerMock.verifyAll(); 
1

あなたは

ArrayList<Runnable> runnables = new ArrayList<Runnable>(); 
    executorService.getQueue().drainTo(runnables); 

は、スレッドプールを中断する前に、このブロックを追加し、同じ動作を実現します。

これは、新しいリストにすべての待ちキューを排出します。

だから、それだけで実行中のスレッドを中断します。

関連する問題