2017-08-24 10 views
2

は、BlockingQueuesのタスクを処理するWorkerThreadsを使用してマルチスレッドアプリケーションを構築しています。作業者は、抽象クラスと同様に、サブクラスはprocessItem()を実装しています。WorkerThread:処理が完了するのを待ちます(BlockingQueue)

abstract class WorkerThread extends Thread { 
    BlockingQueue<Task> q; 
    int tasksInSystem; // globally available 

    public void run() { 
     while(!interrupted()) { 
      Task t = q.take(); 
      process(t); 
      tasksInSystem--; 
     } 
    } 

    abstract void process(Task t); 
} 

特別なことは、すべてのタスクが完了するまで待つことです。 私の最初のアイデアは、となりました:

  • が追加された各タスク
  • を数える処理完了時にカウンタを減らします。

しかし、 しかし、タスクの種類と作業者の実装と複数のキューがあります。だから私は、さまざまなカウンターのトンを維持する必要があります。 「飛行中」のタスクを追跡し、それらが行われたときに知らせるためにワーカープロセスを必要とするキューを必要とする

q.waitForEmptyAndCompleted()

(代わりに:私はしたいのですがどのような

tasksInsystem---;)。

作業者は、タスクをキューから取り出した後にカウントする必要があるため、そのカウンタを増やすことはできません。しかし、take()呼び出しの直後に別のスレッドが実行され、ワー​​カーが事前にカウンタを増やすことができなかった可能性があります。

したがって、カウンタの増加とtake()は一緒に(atomar)結合されている必要があります。特定のBlockingQueueにつながります。

私はpremadeソリューションを見つけませんでした。だから私の最高の推測は私自身のBlockingQueueを実装することです。スレッドセーフなブロッキングキューを独自に実装してテストするのを避けるために、代わりに使用できるものはありますか?あるいは、その待機呼出しを別々に実装する考えはありますか?

+2

'ExecutorService'を使用してシャットダウンできますか?エグゼキュータサービスはすべてのタスクを管理し、キューを関連付ける必要があります。シャットダウンは、すべてのタスクが完了するまでブロックされます。 – hgrey

+0

お返事ありがとうございます。私は実際にはExecutorServiceを実際に使用していました。しかし、私がshutdown()を呼び出すと、サービス全体が停止します。私は実際にキューを再利用したいと思います。 waitForEmptyAndComplete()の後に起こるものは新しいタスクをトリガーします。だから私は各ラウンドのために新しいExecutorServiceを設定する必要があります。少なくとも、それは私自身のキューバリアントを実装するよう強制されません。 – markus

答えて

1

一般的にはExecutorServiceで十分ではないので、おそらくForkJoinPoolが動作しますが、明示的にキューを公開することはありませんが、説明した内容を使用すると非常に使いやすいはずです。

キーメソッドはawaitQuiescence(long timeout, TimeUnit unit)で、送信されたすべてのタスクの実行が完了するまで待機します。

関連する問題