2011-03-10 13 views
5

CompletionServiceを使用していくつかのFutureタスクを送信します。FixedThreadPool ExecutorService 2スレッドをラップして、設定したタスクの数と等しいループを設定し、completionserviceを使用します。それらをすべて完了または失敗させるのを待つtake()。私はtake()メソッドをpoll(300、Timeout.SECONDS)に変更しました.1つのタスクが完了するまでに5分以上かかると考えられます投票は失敗し、最終的にはループから抜け出し、すべての未来を巡ってfuture.cancel(true)を呼び出して問題のタスクを強制的に取り消すことができます。CompletionServiceを使用して長すぎるタスクをキャンセルするには

しかし、コードを実行してハングすると、ポーリングが5分に1回連続して失敗し、それ以上のタスクが実行されないので、2人の作業者が何らかの方法でデッドロックされ、始めること。タイムアウトは5分で、実行するタスクがまだ1000件あったため、ループを解除するには時間がかかりすぎてジョブがキャンセルされました。

私がやりたいことは、5分後にhasntが完了した場合、現在のタスクを中断/強制解除することですが、それを行う方法は何も見えません。

このコードサンプルは、Imはあなたの労働者の例では約

import com.jthink.jaikoz.exception.JaikozException; 
import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println("Worker TimedOut:"); 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && result.get()) 
         { 
          System.out.println("Worker Completed:"); 
         } 
         else 
         { 
          System.out.println("Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println("Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
    { 
     if(number==3) 
     { 
      try 
      { 
       Thread.sleep(50000); 
      } 
      catch(InterruptedException tie) 
      { 

      } 
     } 
     return true; 
    } 
} 

出力

Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker TimedOut: 
Done 
+0

@ user294896 - いくつかのサンプルコードを小さな独立した例で提供できますか? – justkt

+0

@justkt少しでも時間がかかるかもしれません –

+0

なぜタスク自体が時間がかかり過ぎて中断してしまうのはなぜですか?これは物事を大幅に簡素化するでしょう。 – trojanfoe

答えて

4

私はそれを解決したと思う、基本的にタイムアウトが発生した場合、私は将来のオブジェクトのリストを繰り返し、完了していない最初のものを見つけ、強制的に取り消す。エレガントではないようですが、うまくいくようです。

解決策をよりよく示すが、2つのスレッドプールでも機能する出力を表示するには、プールのサイズを変更しました。

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Date; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       System.out.println("Invocation:"+t); 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println(new Date()+":Worker Timedout:"); 
        //So lets cancel the first futures we find that havent completed 
        for(Future future:futures) 
        { 
         System.out.println("Checking future"); 
         if(future.isDone()) 
         { 
          continue; 
         } 
         else 
         { 
          future.cancel(true); 
          System.out.println("Cancelled"); 
          break; 
         } 
        } 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && !result.isCancelled() && result.get()) 
         { 
          System.out.println(new Date()+":Worker Completed:"); 
         } 
         else if(result.isDone() && !result.isCancelled() && !result.get()) 
         { 
          System.out.println(new Date()+":Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(System.out); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println(new Date()+":Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
     throws InterruptedException 
    { 
     try 
     { 
      if(number==3) 
      { 
       Thread.sleep(50000); 
      } 
     } 
     catch(InterruptedException ie) 
     { 
      System.out.println("Worker Interuppted"); 
      throw ie; 
     } 
     return true; 
    } 
} 

出力は

Invocation:0 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:1 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:2 
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout: 
Checking future 
Checking future 
Checking future 
Cancelled 
Invocation:3 
Worker Interuppted 
Invocation:4 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Invocation:5 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Thu Mar 10 20:51:49 GMT 2011:Done 
2

を話して何の簡易版を示し、あなたの呼び出し可能な中断をサポートコールでブロックしています。あなたの本当のコードが本物のロック(​​ブロック)でデッドロックしている場合は、中断によってキャンセルすることはできません。代わりに明示的なロック(java.util.concurrent.Lock)を使用すると、ロック取得の待機時間を指定できます。デッドロック状況が発生した可能性があるため、スレッドがロックを待ってタイムアウトした場合、エラーメッセージが表示されて中止されます。

ところで、あなたの例では、CallableInterruptedExceptionを飲み込むべきではありません。それを渡すか、メソッド宣言のthrows行にInterruptedExceptionを追加するか、catchブロックでスレッドの中断状態をリセットします(Thread.currentThread().interrupt())。

+0

ワーカーコードは実際にコードを実行するために何かを入れている実際のワーカーを表すものではありません。問題はCompletionService/ExecutorServiceと関係があります。問題が発生していると思われるサービスからタスクをキャンセルするにはどうすればいいですか? –

+0

ps:コード内の戦略的ポイントでThread.currentThread()。isInterrupted()もチェックする必要があります。 InterruptedExceptionは、それをサポートするブロッキング操作でない限り、一般にスローされません。 isInterrupted()メソッドを使用して割り込みが発生したかどうかを確認し、その後にタスクとクリーンアップを停止するかどうかはあなた次第です。 – Matt

1

であるあなたはいつも、あなたはfuture.cancel()を呼び出すことができます...それはまだ完了していない場合には、タイムアウト例外を返しますfuture.get(timeout...)
を呼び出すことができます。

関連する問題