私はThreadExecutorPool
の中でカスタムブロッキングキューを使用していますが、タスクワーカーはタスクを取らず、ディスパッチャスレッドは新しいタスクをキューに入れません。カスタムLinkedBlockingQueueデッドロック
カスタムブロッキングキューの実装がデッドロックの原因となっているのは不思議です。このコードに間違いはありますか? add()
とtake()
の方法では、ブロックが良いですか?
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import com.ttech.utils.alarm.Alarm;
import com.ttech.utils.alarm.AlarmInterface;
import com.ttech.utils.counter.Counter;
import com.ttech.utils.counter.SNMPAgent;
public class WorkerQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 1L;
public Integer lowThreshold;
public Integer highThreshold;
public Integer capacity;
public String name;
public String type;
public Counter counter = null;
public boolean writeAlarmLog;
public static final Logger logger = Logger.getLogger(WorkerQueue.class);
public static Alarm HighThresholdAlarm = null;
public static Alarm CapacityAlarm = null;
// Check the size here and clear capacity and high threshold alarms in case
public E take() throws InterruptedException {
E data = super.take();
counter.setNewValue(super.size());
if (super.size() == lowThreshold) {
if(!this.writeAlarmLog) {
HighThresholdAlarm.clear(name);
CapacityAlarm.clear(name);
} else {
HighThresholdAlarm.clearLog(name, "Queue High Threshold");
CapacityAlarm.clearLog(name, "Queue Capacity Overload");
}
}
return data;
}
public E poll() {
E data = super.poll();
counter.setNewValue(super.size());
if (super.size() == lowThreshold) {
if(!this.writeAlarmLog) {
HighThresholdAlarm.clear(name);
CapacityAlarm.clear(name);
} else {
HighThresholdAlarm.clearLog(name, "Queue High Threshold");
CapacityAlarm.clearLog(name, "Queue Capacity Overload");
}
}
return data;
}
public int drainTo(Collection<? super E> c, int maxElements){
int size = super.drainTo(c,maxElements);
counter.setNewValue(super.size());
return size;
}
// During adding the data to queue check capacity and high threshold raise alarm in case
public boolean add(E data) {
Boolean rc = true;
if (capacity > 0) {
if (this.size() >= capacity) {
logger.error("Queue " + name + " is over capacity");
if(!this.writeAlarmLog)
CapacityAlarm.raise(name);
else
CapacityAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue Capacity Overload");
return false;
}
}
if (!super.add(data)) {
logger.error("Cannot add data to queue:" + name);
rc = false;
} else {
counter.setNewValue(super.size());
}
if (highThreshold == super.size()) {
if(!this.writeAlarmLog)
HighThresholdAlarm.raise(name);
else
HighThresholdAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue High Threshold");
}
return rc;
}
}
はいクラススコープのメンバー(変数とオブジェクト)があるのでスレッドセーフにするには 'synchronized'メソッドが必要だと思うAFAIKこれはスレッドセーフではありません。 – Yazan
私は、しばしば同時実行の問題につながるため、並列データ構造をサブクラス化しないことを勧めます。さらに、アラームを発生させる原因となるキューを作成することは、予想される範囲外のように見えます(単なるデータ構造であることに注意してください)。 Teppicが示唆しているように、 'ThreadPoolExecutor'をサブクラス化し、' beforeExecute'と 'afterExecute'を使用して目標を達成する必要があります。 – Spotted
注意:すべての 'public'属性は本当に恐ろしいです!このクラスを楽しんでください。 ;-) – Spotted