2012-05-04 21 views
15

await()を使用して、サイズ1のCountDownLatchで待機している複数のコンシューマスレッドがあります。私は成功したときにcountDown()を呼び出す単一のプロデューサスレッドを持っています。CountDownLatchを「キャンセル」するにはどうすればよいですか?

エラーがない場合に効果的です。

しかし、プロデューサがエラーを検出した場合、そのエラーをコンシューマスレッドに通知できるようにしたいと思います。理想的には、プロデューサにabortCountDown()のようなものを呼び出し、すべてのコンシューマにInterruptedExceptionやその他の例外を受け取らせることができます。 countDown()に電話する必要はありません。これは、私のコンシューマスレッドのすべてが、await()への呼び出しの後に、成功のために追加のマニュアルチェックを行う必要があるためです。私はむしろ彼らは例外を受け取り、彼らはすでにどのように処理するのか知っています。

アボート機能はCountDownLatchにはありません。カウントダウンを中止するのに有効なCountDownLatchを効果的に作成するために簡単に適応できる別の同期プリミティブがありますか?

答えて

11

JBニジェットは素晴らしい答えを得ました。私は彼を取って少し磨いた。結果は、AbortableCountDownLatchと呼ばれるCountDownLatchのサブクラスであり、クラスに「abort()」メソッドを追加して、ラッチで待機しているすべてのスレッドがAbortException(InterruptedExceptionのサブクラス)を受け取るようにします。

また、JBortのクラスとは異なり、AbortableCountDownLatchは、カウントダウンがゼロになるのを待つのではなく、アボートですぐにすべてのブロッキングスレッドを中止します(カウント> 1の場合)。

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 

public class AbortableCountDownLatch extends CountDownLatch { 
    protected boolean aborted = false; 

    public AbortableCountDownLatch(int count) { 
     super(count); 
    } 


    /** 
    * Unblocks all threads waiting on this latch and cause them to receive an 
    * AbortedException. If the latch has already counted all the way down, 
    * this method does nothing. 
    */ 
    public void abort() { 
     if(getCount()==0) 
      return; 

     this.aborted = true; 
     while(getCount()>0) 
      countDown(); 
    } 


    @Override 
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
     final boolean rtrn = super.await(timeout,unit); 
     if (aborted) 
      throw new AbortedException(); 
     return rtrn; 
    } 

    @Override 
    public void await() throws InterruptedException { 
     super.await(); 
     if (aborted) 
      throw new AbortedException(); 
    } 


    public static class AbortedException extends InterruptedException { 
     public AbortedException() { 
     } 

     public AbortedException(String detailMessage) { 
      super(detailMessage); 
     } 
    } 
} 
+0

このクラスの使い方、私の状況はリストであり、リストはリアルタイムで動的に更新されています。自動イベントが2分間待たなければならないリストに入っていますが、待機時間User Manual Eventがリストに入ってくると、待機を中断して直ちに処理を進めなければなりません。 –

12

は内部たCountDownLatchを使用して、特定の、より高いレベルのクラス内でこの動作をカプセル化:

public class MyLatch { 
    private CountDownLatch latch; 
    private boolean aborted; 
    ... 

    // called by consumers 
    public void await() throws AbortedException { 
     latch.await(); 
     if (aborted) { 
      throw new AbortedException(); 
     } 
    } 

    // called by producer 
    public void abort() { 
     this.aborted = true; 
     latch.countDown(); 
    } 

    // called by producer 
    public void succeed() { 
     latch.countDown(); 
    } 
} 
+1

このスレッドセーフにするには、「強制終了」しないでください。 –

+5

いいえ、スレッド安全性はCountDownLatchによって保証されるため、countDownメソッドはスレッド間にメモリバリアが存在することを保証します。 javadocは次のように述べています。「countDown()を呼び出す前のスレッド内のアクションは、対応するawait()から正常に返されたアクションの前に発生します。 –

+0

はい、今はそれを読むことを覚えています。ありがとう。 –

3

あなたはウェイターをキャンセルする機能を提供しCountDownLatchのラッパーを作成することができます。待機中のスレッドを追跡し、タイムアウト時に解放する必要があります。また、ラッチがキャンセルされたことを覚えておいて、今後のawaitへの呼び出しはすぐに中断されます。

public class CancellableCountDownLatch 
{ 
    final CountDownLatch latch; 
    final List<Thread> waiters; 
    boolean cancelled = false; 

    public CancellableCountDownLatch(int count) { 
     latch = new CountDownLatch(count); 
     waiters = new ArrayList<Thread>(); 
    } 

    public void await() throws InterruptedException { 
     try { 
      addWaiter(); 
      latch.await(); 
     } 
     finally { 
      removeWaiter(); 
     } 
    } 

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
     try { 
      addWaiter(); 
      return latch.await(timeout, unit); 
     } 
     finally { 
      removeWaiter(); 
     } 
    } 

    private synchronized void addWaiter() throws InterruptedException { 
     if (cancelled) { 
      Thread.currentThread().interrupt(); 
      throw new InterruptedException("Latch has already been cancelled"); 
     } 
     waiters.add(Thread.currentThread()); 
    } 

    private synchronized void removeWaiter() { 
     waiters.remove(Thread.currentThread()); 
    } 

    public void countDown() { 
     latch.countDown(); 
    } 

    public synchronized void cancel() { 
     if (!cancelled) { 
      cancelled = true; 
      for (Thread waiter : waiters) { 
       waiter.interrupt(); 
      } 
      waiters.clear(); 
     } 
    } 

    public long getCount() { 
     return latch.getCount(); 
    } 

    @Override 
    public String toString() { 
     return latch.toString(); 
    } 
} 
+0

同じものを使用する例を説明してください。私の状況は私はリストを持っていて、リストはリアルタイムで動的に更新されています。自動イベントが2分間待たなければならないリストに入っていますが、待機時間User Manual Eventがリストに入ってくると、待機を中断して直ちに処理を進めなければなりません。 –

0

あなたは、その保護されたgetWaitingThreadsメソッドへのアクセスを可能にするReentrantLockを使用して、独自のCountDownLatchをロールアウトすることができます。

例:

public class FailableCountDownLatch { 
    private static class ConditionReentrantLock extends ReentrantLock { 
     private static final long serialVersionUID = 2974195457854549498L; 

     @Override 
     public Collection<Thread> getWaitingThreads(Condition c) { 
      return super.getWaitingThreads(c); 
     } 
    } 

    private final ConditionReentrantLock lock = new ConditionReentrantLock(); 
    private final Condition countIsZero = lock.newCondition(); 
    private long count; 

    public FailableCountDownLatch(long count) { 
     this.count = count; 
    } 

    public void await() throws InterruptedException { 
     lock.lock(); 
     try { 
      if (getCount() > 0) { 
       countIsZero.await(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public boolean await(long time, TimeUnit unit) throws InterruptedException { 
     lock.lock(); 
     try { 
      if (getCount() > 0) { 
       return countIsZero.await(time, unit); 
      } 
     } finally { 
      lock.unlock(); 
     } 
     return true; 
    } 

    public long getCount() { 
     lock.lock(); 
     try { 
      return count; 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void countDown() { 
     lock.lock(); 
     try { 
      if (count > 0) { 
       count--; 

       if (count == 0) { 
        countIsZero.signalAll(); 
       } 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void abortCountDown() { 
     lock.lock(); 
     try { 
      for (Thread t : lock.getWaitingThreads(countIsZero)) { 
       t.interrupt(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 

あなたはそれがキャンセルされた後awaitへの新しいコールにInterruptedExceptionをスローするように、このクラスを変更することができます。もしあなたがその機能を必要とするなら、このクラスをCountDownLatchまで拡張することさえできます。