2017-03-10 11 views
1

BlockingQueueRunnableです。すべてのタスクをTaskExecutorの実装の1つを使用して実行するだけで、すべてが並行して実行されます。 しかし、Runnableの一部は他人に依存しているため、Runnableが完了したら待つ必要があることを意味し、実行することができます。Spring TaskExecutorフレームワークでスレッドを管理する方法

ルールは非常に単純です:すべてRunnableにコードがあります。同じコードを持つ2つのRunnableは同時に実行することはできませんが、コードが異なる場合は並列に実行する必要があります。 つまり、すべての実行中のRunnableは異なるコードを持つ必要があり、すべての「複製」が待機する必要があります。

問題は、スレッドが終了したときにイベント/メソッド/ものがないことです。私ができる は、すべてのRunnableに、このような通知を建てたが、それだけでスレッドが終了する前に行われますので、それは

java.util.concurrent.ThreadPoolExecutor終了方法afterExecuteを持っていないの後、私は、このアプローチが好きではないが、それが実装する必要があります - 春デフォルト実装のみを使用し、このメソッドは無視されます。

Runnableが既に実行中(この情報へのアクセスは実装されていません)と、コードが重複しているため延期されている2つの追加コレクションを追跡する必要があるため、複雑になります。

私はBlockingQueueのようにポーリングがないので、新しいものがキューに入ったときにスレッドをアクティブにするだけです。しかし、おそらくRunnableの間のそのような依存関係を管理するためのより良いアプローチがあるので、私はBlockingQueueをあきらめて別の戦略を使用する必要がありますか?

答えて

0

考えられるべき代替戦略の1つは、考えられるコードごとに個別のシングルスレッドエグゼキュータを用意することです。次に、新しいRunnableを提出する場合は、コードに使用する正しいエグゼキュータを検索してジョブをサブミットします。

これは、あなたが持っているコードの数によっては、良い解決策であるかもしれません。考慮すべき主なことは、実行中のスレッドの同時実行数が、異なるコードの数と同程度になることです。多くの異なるコードがある場合、これは問題になる可能性があります。

もちろん、同時実行ジョブの数を制限するにはSemaphoreを使用できます。コードごとに1つのスレッドを作成しますが、実際に同時に実行できるのは限られた数だけです。例えば、これは、同時に実行するために、3つの異なるコードまで可能、コードによってジョブをシリアライズなる:

public class MultiPoolExecutor { 
    private final Semaphore semaphore = new Semaphore(3); 

    private final ConcurrentMap<String, ExecutorService> serviceMap 
      = new ConcurrentHashMap<>(); 

    public void submit(String code, Runnable job) { 
     ExecutorService executorService = serviceMap.computeIfAbsent(
       code, (k) -> Executors.newSingleThreadExecutor()); 
     executorService.submit(() -> { 
      semaphore.acquireUninterruptibly(); 
      try { 
       job.run(); 
      } finally { 
       semaphore.release(); 
      } 
     }); 
    } 
} 

別のアプローチは、ロックを解除し、完了時に実行することができたジョブをチェックするためにRunnableを変更することであろう(したがって、ポーリングを避ける) - この例のように、すべてのジョブをサブミットできるまでリストに保持します。ブーリアン・ラッチは、各コードのジョブが一度にスレッドプールに提出されることを保証します。新しいジョブが到着するか実行中のジョブが完了するたびに、コードはサブミットできる新しいジョブを再度チェックします(CodedRunnableは単にコードプロパティを持つRunnableの拡張です)。

public class SubmissionService { 
    private final ExecutorService executorService = Executors.newFixedThreadPool(5); 
    private final ConcurrentMap<String, AtomicBoolean> locks = new ConcurrentHashMap<>(); 
    private final List<CodedRunnable> jobs = new ArrayList<>(); 

    public void submit(CodedRunnable codedRunnable) { 
     synchronized (jobs) { 
      jobs.add(codedRunnable); 
     } 
     submitWaitingJobs(); 
    } 

    private void submitWaitingJobs() { 
     synchronized (jobs) { 
      for(Iterator<CodedRunnable> iter = jobs.iterator(); iter.hasNext();) { 
       CodedRunnable nextJob = iter.next(); 
       AtomicBoolean latch = locks.computeIfAbsent(
         nextJob.getCode(), (k) -> new AtomicBoolean(false)); 
       if(latch.compareAndSet(false, true)) { 
        iter.remove(); 
        executorService.submit(() -> { 
         try { 
          nextJob.run(); 
         } finally { 
          latch.set(false); 
          submitWaitingJobs(); 
         } 
        }); 
       } 
      } 
     } 
    } 
} 

このアプローチの欠点は、各タスクが完了した後でコードが待機中のジョブのリスト全体をスキャンする必要があることです。もちろん、これをより効率的にすることができます。完了するタスクは、実際には同じコードを持つ他のジョブをチェックするだけで済みます。そのため、ジョブはより速い処理を可能にするためにMap<String, List<Runnable>>構造体に格納できます。

+0

私はこの最初のアプローチが似ていますが、必要でないときにExecutorServiceが破棄された方が良いでしょう。 – Marx

1

異なるコードの数がそれほど多くない場合、BarrySW19によって提供される可能なコードごとに別々のシングルスレッドエグゼキュータを使用する方法は問題ありません。 スレッドの全体数は、代わりにシングルスレッド実行部から、我々は(アッカまたは他の同様のライブラリから)アクターを使用することができ、許容できなくなる場合:元の溶液のよう

public class WorkerActor extends UntypedActor { 
    public void onReceive(Object message) { 
    if (message instanceof Runnable) { 
     Runnable work = (Runnable) message; 
     work.run(); 
    } else { 
     // report an error 
    } 
    } 
} 

WorkerActor秒間ActorRef秒HashMapに集められます。与えられたコードに対応するActorRef workerActorRefが取得(検索または作成)されると、Runnable jobworkerActorRef.tell(job)で実行に提出されます。

あなたが俳優のライブラリへの依存関係を持ってしたくない場合は、あなたがゼロからWorkerActorをプログラムすることができます。

public class WorkerActor implements Runnable, Executor { 
    Executor executor=ForkJoinPool.commonPool(); // or can by assigned in constructor 
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueu<>(); 
    boolean running = false; 

    public synchronized void execute(Runnable job) { 
    queue.put(job); 
    if (!running) { 
     executor.execute(this); // execute this worker, not job! 
     running=true; 
    } 

    public void run() { 
    for (;;) { 
     Runnable work=null; 
     synchronized (this) { 
     work = queue.poll(); 
     if (work==null) { 
      running = false; 
      return; 
     } 
     } 
     work.run(); 
    } 
    } 
} 

与えられたコードに対応するWorkerActor workerを取得(取得したまたは作成)されると、 Runnable jobworker.execute(job)で実行されます。

+0

同じコードで複数のジョブを連続して送信した場合、最も一般的なプールスレッドは最初のループスレッドがループして待っているだけで、他のコードはブロックされますか? – BarrySW19

+0

@ BarrySW19各ユーザーのジョブは対応するコードでWorkerActorにキューイングされるため、プールスレッドを占有しません。アクタ自体は、多くとも1つのスレッドで占有することができます。したがって、最初に同じコードを持つ多くのジョブがサブミットされた場合、プールスレッドのみがそのジョブを順番に1つずつ処理するWorkerActorで占有されます。 –

+0

ああ、私はそれを得る - 仕事が提出されるたびに、そのキューが空になるまでActorがループする - 明らかに作業中に提出されたジョブは、そのプロセスの一部として取り上げられる。これは、私の第2の提案で次のタスクを選択するのに使用される効率の悪いスキャンを回避する。 – BarrySW19

関連する問題