2017-07-29 17 views
3

任意のScheduledExecutorServiceに固定レートでスケジュールされたコマンドがある場合、キャンセルできるScheduledFutureも返します。 "cancel"は、キャンセルが返された後、コマンドがまだ実行されていないことを保証していません。ShceduledFutureをキャンセルして、runnableが中止されるのを待っていますか?

ほとんどの場合、十分な機能です。しかし、キャンセル後に現在のスレッドをブロックする必要があるときに、コマンドがすでに進行中であれば、私はusecaseを処理し、コマンドが完了するまで待つ。言い換えれば、キャンセルを呼び出したスレッドは、コマンドがまだ実行中であれば、先に進むべきではありません。 mayInterruptIfRunning = trueでキャンセルすることは適切ではありません。現在の実行を中断したくないため、通常の完了を待つだけです。

標準のJDKクラスでこの要件を達成する方法が見つかりませんでした。 質問1:私は間違っていましたが、この種の機能はありますか?

だから私はそれを単独で実装することにしました: import java.util.concurrent。*;

public class GracefullyStoppingScheduledFutureDecorator implements ScheduledFuture { 

/** 
* @return the scheduled future with method special implementation of "cancel" method, 
* which in additional to standard implementation, 
* provides strongly guarantee that command is not in the middle of progress when "cancel" returns 
*/ 
public static ScheduledFuture schedule(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) { 
    CancellableCommand cancellableCommand = new CancellableCommand(command); 
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit); 
    return new GracefullyStoppingScheduledFutureDecorator(future, cancellableCommand); 
} 

private GracefullyStoppingScheduledFutureDecorator(ScheduledFuture targetFuture, CancellableCommand command) { 
    this.targetFuture = targetFuture; 
    this.runnable = command; 
} 

private final ScheduledFuture targetFuture; 
private final CancellableCommand runnable; 

@Override 
public boolean cancel(boolean mayInterruptIfRunning) { 
    runnable.cancel(); 
    return targetFuture.cancel(mayInterruptIfRunning); 
} 

@Override 
public long getDelay(TimeUnit unit) { 
    return targetFuture.getDelay(unit); 
} 

@Override 
public int compareTo(Delayed o) { 
    return targetFuture.compareTo(o); 
} 

@Override 
public boolean isCancelled() { 
    return targetFuture.isCancelled(); 
} 

@Override 
public boolean isDone() { 
    return targetFuture.isDone(); 
} 

@Override 
public Object get() throws InterruptedException, ExecutionException { 
    return targetFuture.get(); 
} 

@Override 
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
    return targetFuture.get(timeout, unit); 
} 

private static class CancellableCommand implements Runnable { 

    private final Object monitor = new Object(); 
    private final Runnable target; 
    private boolean cancelled = false; 

    private CancellableCommand(Runnable target) { 
     this.target = target; 
    } 

     public void cancel() { 
      synchronized (monitor) { 
       cancelled = true; 
      } 
     } 

     @Override 
     public void run() { 
      synchronized (monitor) { 
       if (!cancelled) { 
        target.run(); 
       } 
      } 
     } 

    } 

} 

Question2:誰もが上記のコードのエラーを見つけることができますか?

答えて

2

質問2:上記のコードで誰かがエラーを見つけることができますか?

次のシナリオによって再生することができる仮想的なデッドロックがあります:モニタM1を保持

  1. 有するスレッドT1は、
  2. スケジュールされたタスクが実行され は、スレッドT2に(そのモニタM2を保持している)とに望んM1に入力すると、T2はT1がモニタM1を終了するまで待つ必要があります。
  3. T1はタスクをキャンセルすることに決めましたが、モニタM2がタスク自体によってロックされているため、デッドロックが発生しています。 abovr

ほとんどのシナリオは非現実的ですが、すべての可能な例から保護するために、私はロックフリーな方法でコードを書き直すことにしました:

public class GracefullyStoppingScheduledFuture { 

/** 
* @return the scheduled future with method special implementation of "cancel" method, 
* which in additional to standard implementation, 
* provides strongly guarantee that command is not in the middle of progress when "cancel" returns 
*/ 
public static GracefullyStoppingScheduledFuture cheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) { 
    CancellableCommand cancellableCommand = new CancellableCommand(command); 
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit); 
    return new GracefullyStoppingScheduledFuture(future, cancellableCommand); 
} 

private GracefullyStoppingScheduledFuture(ScheduledFuture targetFuture, CancellableCommand command) { 
    this.targetFuture = targetFuture; 
    this.runnable = command; 
} 

private final ScheduledFuture targetFuture; 
private final CancellableCommand runnable; 

public void cancelAndBeSureOfTermination(boolean mayInterruptIfRunning) throws InterruptedException, ExecutionException { 
    try { 
     targetFuture.cancel(mayInterruptIfRunning); 
    } finally { 
     runnable.cancel(); 
    } 
} 

private static class CancellableCommand implements Runnable { 

    private static final int NOT_EXECUTING = 0; 
    private static final int IN_PROGRESS = 1; 
    private static final int CANCELLED_WITHOUT_OBSTRUCTION = 2; 
    private static final int CANCELLED_IN_MIDDLE_OF_PROGRESS = 3; 

    private final AtomicInteger state = new AtomicInteger(NOT_EXECUTING); 
    private final AtomicReference<Thread> executionThread = new AtomicReference<>(); 
    private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>(); 
    private final Runnable target; 

    private CancellableCommand(Runnable target) { 
     this.target = target; 
    } 

    public void cancel() throws ExecutionException, InterruptedException { 
     if (executionThread.get() == Thread.currentThread()) { 
      // cancel method was called from target by itself 
      state.set(CANCELLED_IN_MIDDLE_OF_PROGRESS); 
      return; 
     } 
     while (true) { 
      if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) { 
       return; 
      } 
      if (state.get() == CANCELLED_IN_MIDDLE_OF_PROGRESS) { 
       cancellationFuture.get(); 
       return; 
      } 
      if (state.compareAndSet(NOT_EXECUTING, CANCELLED_WITHOUT_OBSTRUCTION)) { 
       return; 
      } 
      if (state.compareAndSet(IN_PROGRESS, CANCELLED_IN_MIDDLE_OF_PROGRESS)) { 
       cancellationFuture.get(); 
       return; 
      } 
     } 
    } 

    @Override 
    public void run() { 
     if (!state.compareAndSet(NOT_EXECUTING, IN_PROGRESS)) { 
      notifyWaiters(); 
      return; 
     } 

     try { 
      executionThread.set(Thread.currentThread()); 
      target.run(); 
     } finally { 
      executionThread.set(null); 
      if (!state.compareAndSet(IN_PROGRESS, NOT_EXECUTING)) { 
       notifyWaiters(); 
      } 
     } 
    } 

    private void notifyWaiters() { 
     if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) { 
      // no need to notify anything 
      return; 
     } 
     // someone waits for cancelling 
     cancellationFuture.complete(null); 
     return; 
    } 

} 
関連する問題