私の場合、優先順位に基づいてタスクを実行できるエグゼキュータが必要です。これを実現する簡単な方法は、PriorityBlockingQueueでスレッドプールを使用し、newTaskFor()をオーバーライドして、タスクの優先順位に基づいて比較可能なカスタムの将来のタスクを返すことです。これで優先タスクを持つスレッドプールエグゼキュータ
//Define priorities
public enum Priority {
HIGH, MEDIUM, LOW, VERYLOW;
}
優先度のタスク
//A Callable tasks that has priority. Concrete implementation will implement
//call() to do actual work and getPriority() to return priority
public abstract class PriorityTask<V> implements Callable<V> {
public abstract Priority getPriority();
}
実際のエグゼキュータの実装
public class PriorityTaskThreadPoolExecutor <V> {
int _poolSize;
private PriorityBlockingQueue<Runnable> _poolQueue =
new PriorityBlockingQueue<Runnable>(500);
private ThreadPoolExecutor _pool;
public PriorityTaskThreadPoolExecutor (int poolSize) {
_poolSize = poolSize;
_pool = new ThreadPoolExecutor(_poolSize, _poolSize, 5, TimeUnit.MINUTES,
_poolQueue) {
//Override newTaskFor() to return wrap PriorityTask
//with a PriorityFutureTaskWrapper.
@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
return new PriorityFutureTaskWrapper<V>((PriorityTask<V>) c);
}
};
_pool.allowCoreThreadTimeOut(true);
}
public Future<V> submit (PriorityTask<V> task) {
return _pool.submit(task);
}
}
//A future task that wraps around the priority task to be used in the queue
class PriorityFutureTaskWrapper<V> extends FutureTask<V>
implements Comparable <PriorityFutureTaskWrapper<V>> {
PriorityTask<V> _priorityTask;
public PriorityFutureTaskWrapper (PriorityTask<V> priorityTask) {
super(priorityTask);
_priorityTask = priorityTask;
}
public PriorityTask<V> getPriorityTask() {
return _priorityTask;
}
@Override
public int compareTo(PriorityFutureTaskWrapper<V> o) {
return _priorityTask.getPriority().ordinal() -
o.getPriorityTask().getPriority().ordinal();
}
}
問題は、私のユースケースでは、優先度の低いタスクが永遠に飢えることが可能性がある、です。私はこれを避けたい。私はjavaで利用可能なエグゼキュータ/プールを使用してこれを行うきれいな方法を見つけることができませんでした。だから私は自分のエグゼクターを書くことを考えています。私には2つの異なるアプローチがあります。
1)PriorityBlockingQueueを持つカスタムスレッドプール。別のスレッドがあり、キュー内のタスクの経過時間をチェックします。古いタスクは削除され、エスカレートされた優先度で再追加されます。
2)私のユースケースでは、優先度が1〜4と限られています。私は優先度ごとに4つの異なるキューを持っています。キュー上でブロックするのではなく、カスタムプール内のスレッドは、次のタスクを実行する必要があるときに、次の順序でキューをスキャンします。
40%スレッド - Q1、Q2、Q3、Q4
30%スレッド - Q2、Q1、Q3、Q4
20%スレッド - Q3、Q1、Q2、Q4
10 %スレッド - Q4、Q1、Q2、Q3
スキャンは、キューに新規追加が通知されたとき、またはそのスレッドによって実行されている現在のタスクが完了したときにスレッドによって行われます。それ以外の場合は、スレッドが待機します。しかし、スキャンはキューをブロックするのに比べると非効率になります。
私の用途にはApprach 2が適しています。
これらのアプローチのいずれかを試したことがありますか、または同様の用途に別のアプローチを試みたことはありますか?任意の思考/提案?
エスカレーションを追加するために、既存のPriorityTaskの実装を変更できませんでしたか? OR:あなたの "compareTo"に、タスクの年齢を考慮する手段を追加します。したがって、あなたの「有効な」優先度は、優先度フィールドの価値だけでなく、タスクの年齢です。 – Fildor
エスカレートするには、PriorityTaskはCPUサイクルを取得する必要があります。スレッドまたは別のスレッドの1つが、チェックしてエスカレートする必要があります。キューから削除し、エスカレーション後に再追加して、新しい優先順位に基づく位置に挿入するようにしてください。それが私の記事で述べた第1のアプローチです。しかし、私はそのアプローチについて非常に確信していません... – Rajesh
私がかつて使ったもう一つの方法は、 "計画された排水"を実施したことでした。つまり、低負荷の確率が高い1日に1回、Queue全体を取り出し、別のSingleThreadPoolExecutorで処理された別のキューに排水しました。 – Fildor