2016-12-21 29 views
1

私はThreadExecutorPoolの中でカスタムブロッキングキューを使用していますが、タスクワーカーはタスクを取らず、ディスパッチャスレッドは新しいタスクをキューに入れません。カスタムLinkedBlockingQueueデッドロック

カスタムブロッキングキューの実装がデッドロックの原因となっているのは不思議です。このコードに間違いはありますか? add()take()の方法では、​​ブロックが良いですか?

import java.util.Collection; 
import java.util.concurrent.LinkedBlockingQueue; 

import org.apache.log4j.Logger; 

import com.ttech.utils.alarm.Alarm; 
import com.ttech.utils.alarm.AlarmInterface; 
import com.ttech.utils.counter.Counter; 
import com.ttech.utils.counter.SNMPAgent; 

public class WorkerQueue<E> extends LinkedBlockingQueue<E> { 

    private static final long serialVersionUID = 1L; 

    public Integer lowThreshold; 

    public Integer highThreshold; 

    public Integer capacity; 

    public String name; 

    public String type; 

    public Counter counter = null; 

    public boolean writeAlarmLog; 

    public static final Logger logger = Logger.getLogger(WorkerQueue.class); 

    public static Alarm HighThresholdAlarm = null; 
    public static Alarm CapacityAlarm = null; 

    // Check the size here and clear capacity and high threshold alarms in case 
    public E take() throws InterruptedException { 
     E data = super.take(); 
     counter.setNewValue(super.size()); 
     if (super.size() == lowThreshold) {    
      if(!this.writeAlarmLog) { 
       HighThresholdAlarm.clear(name); 
       CapacityAlarm.clear(name); 
      } else { 
       HighThresholdAlarm.clearLog(name, "Queue High Threshold"); 
       CapacityAlarm.clearLog(name, "Queue Capacity Overload"); 
      } 
     } 
     return data; 
    } 

    public E poll() { 
     E data = super.poll(); 
     counter.setNewValue(super.size()); 
     if (super.size() == lowThreshold) { 
      if(!this.writeAlarmLog) { 
       HighThresholdAlarm.clear(name); 
       CapacityAlarm.clear(name); 
      } else { 
       HighThresholdAlarm.clearLog(name, "Queue High Threshold"); 
       CapacityAlarm.clearLog(name, "Queue Capacity Overload"); 
      } 
     } 
     return data; 
    } 


    public int drainTo(Collection<? super E> c, int maxElements){ 
     int size = super.drainTo(c,maxElements);  
     counter.setNewValue(super.size());  
     return size; 
    } 

    // During adding the data to queue check capacity and high threshold raise alarm in case 
    public boolean add(E data) { 
     Boolean rc = true; 

     if (capacity > 0) { 
      if (this.size() >= capacity) { 
       logger.error("Queue " + name + " is over capacity"); 
       if(!this.writeAlarmLog) 
        CapacityAlarm.raise(name); 
       else 
        CapacityAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue Capacity Overload"); 
       return false; 
      } 
     } 

     if (!super.add(data)) { 
      logger.error("Cannot add data to queue:" + name); 
      rc = false; 
     } else { 
      counter.setNewValue(super.size()); 
     } 

     if (highThreshold == super.size()) { 


      if(!this.writeAlarmLog) 
       HighThresholdAlarm.raise(name); 
      else 
       HighThresholdAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue High Threshold"); 
     } 

     return rc; 
    } 
} 
+0

はいクラススコープのメンバー(変数とオブジェクト)があるのでスレッドセーフにするには 'synchronized'メソッドが必要だと思うAFAIKこれはスレッドセーフではありません。 – Yazan

+1

私は、しばしば同時実行の問題につながるため、並列データ構造をサブクラス化しないことを勧めます。さらに、アラームを発生させる原因となるキューを作成することは、予想される範囲外のように見えます(単なるデータ構造であることに注意してください)。 Teppicが示唆しているように、 'ThreadPoolExecutor'をサブクラス化し、' beforeExecute'と 'afterExecute'を使用して目標を達成する必要があります。 – Spotted

+0

注意:すべての 'public'属性は本当に恐ろしいです!このクラスを楽しんでください。 ;-) – Spotted

答えて

2

ThreadPoolExecutorその作業キューにないaddタスクを実行します。それはoffersであり、それが受け入れられない場合は、設定されたRejectedExecutionHandlerに渡されます。デフォルトではabort policy handlerで、RejectedExecutionExceptionがスローされます。

カスタムキューのaddメソッドが呼び出されることはありません。

実行中のタスクの数の変更を追跡する場合は、実行者自身のbeforeExecuteまたはafterExecuteメソッドをオーバーライドすることをお勧めします。アクティブタスクの数はgetActiveCountから取得できます。

+0

ありがとう、私はオーバーライドoffer()メソッドもあります。できます。これらのオーバーライドされたメソッドに対して、同期化されたブロックを追加する必要がありますか? –

関連する問題