2016-04-15 13 views
3

私はThreadPoolTaskExecutorを持っています。Runnableを実装するProcessを作成すると、executor.execute(process)を実行します。ThreadPoolTask​​Executor経由でスレッドを繰り返す

は今、executeを呼び出す前に、私はProcessオブジェクトから一つのフィールドをチェックし、他のすべてのは、現在、私のThreadPoolTaskExecutorによって実行のプロセスを、実行しているとそれを比較したいです。どのように私はそれを行うことができますが、同時に発生する問題はありませんか?

コード:

public class MyApp { 

    ThreadPoolTaskExecutor executor; 

    //... 

    public void runProcesses { 
     Process firstone = new Process(1); 
     Process nextOne = new Process(1); 


     // iterate through all processes started via executor and currently running, 
     // verify if there is any process.getX() == 1, if not run it 

     executor.execute(firstone); 

     //wait till firstone will end becouse have the same value of X 
     executor.execute(nextOne); // this cant be perform until the first one will end 
    } 
} 

public class Process { 
    private int x; 

    //... 

    public Process (int x){ 
     this.x = x; 
    } 
    public int getX(){ 
     return this.x; 
    } 

} 

私は、プロセスのシンプルSetをcreateing考えていたが起動し、そこに新しいものを追加します。しかし、私はそれがまだ実行されていると判断し、それが完了したらセットから削除する方法に問題があります。今はスレッドの実行を繰り返すことを考えていますが、どうしたらいいのか分かりません。

+1

- すべてではありません。この種のチェックを行うときは、常に並行性の問題に対処する必要があります。実際にチェックしているのは何ですか?おそらくあなたは別の方法でそれをすることができますか? – Fildor

+0

私が言及したことを確かにチェックしています。言い換えると、フィールドxの値が同じであるプロセスは2つありません。私は私がする必要があるwhantrateに多くのコードを追加しました。 – ilovkatie

答えて

3

私はあなたの初期のアイデアはかなり良いと思うし、多すぎるコードで作業することができます。それは切り離すために、いくつかの工夫が必要になります「この値のRunnableがすでに実行されている」「このRunnableを実行」から、しかし、ここではそれについて世話をしていないラフイラストです :

  • equals()を実装し、 hashCode()Processに設定すると、順序付けされていないセットやマップでもインスタンスを安全に使用できます。
  • あなたがマップのputIfAbsent()メソッドを使用したいと思いますので、あなたがCollections.newSetFromMap(new ConcurrentHashMap<Process, Boolean>)を使用されることはありませんConcurrentMap<Process, Boolean>
    • を作成します。
  • は、あなたが提出して返された値がnullない場合は救済されることputIfAbsent()Processを使用して、それに追加してみてください。
    • null戻り値は、すでにマップに(したがって処理中の)Processが存在することを意味します。
    • 非常にきれいではない解決策は、各Processインスタンスにマップへの参照を挿入し、run()メソッドで最初に行う作業はputIfAbsent(this, true)です。
  • 処理を終了した各Processを削除します。
    • 些細ではなく非常にきれいな解決策は、各Processインスタンスにマップへの参照を注入して、あなたのrun()方法で行う最後のものとしてremove(this)を持っているだろう。
    • ProcessにはCallableが実装され、その結果としてユニークな値が返されるため、マップから削除するか、CompletableFutureとそのthenAccept()コールバックを使用できます。

Here's上記自明としない非常にきれいな溶液(直接ここに貼り付けるにはあまりにも長いコード)を示すサンプル。

+0

私にとって完璧な音ですが、私は間違いなく試してみます。 – ilovkatie

+0

しかし、それは 'ConcurrentMap 'ではないでしょうか? Xフィールド(整数型)と比較したいからです。あるいは、 'equals()'と 'hashCode()'の実装はXフィールドだけに焦点を当てるべきですか? – ilovkatie

+0

「先輩が終了するまで待つ」は成就していませんよね? – hahn

1

@Dimitarがこの問題を解決するための非常に良い解決策を提供しましたが、別のアプローチで追加したいと思います。

あなたの要件を満たしていれば、すべてProcessを提出して、xで分割して別々のキューにまとめ、キュー内のプロセスを1つずつ実行する必要があるようです。 ThreadPoolExecutor

APIはエグゼキュータの行動を強化するために権限を与えると私はThreadPoolExecutorの次の実装に来た:「同時問題が発生していない...」

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 
     0L, TimeUnit.MILLISECONDS, 
     new LinkedBlockingQueue<>()) { 

    private final ConcurrentMap<Integer, Queue<Runnable>> processes = new ConcurrentHashMap<>(); 

    @Override 
    public void execute(Runnable command) { 
     if (command instanceof Process) { 
      int id = ((Process) command).getX(); 
      Queue<Runnable> t = new ArrayDeque<>(); 
      Queue<Runnable> queue = this.processes.putIfAbsent(id, t); 
      if (queue == null) { 
       queue = t; 
      } 
      synchronized (queue) { 
       queue.add(command); 
       if (!processes.containsKey(id)) { 
        processes.put(id, queue); 
       } 
       if (queue.size() == 1) { 
        super.execute(queue.peek()); // removal of current process would be done in #afterExecute 
       } 
      } 
     } else { 
      super.execute(command); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     if (r instanceof Process) { 
      int id = ((Process) r).getX(); 
      Queue<Runnable> queue = this.processes.get(id); 
      synchronized (queue) { 
       queue.poll(); // remove completed prev process 
       Runnable nextProcess = queue.peek(); // retrieve next process 
       if (nextProcess != null) { 
        super.execute(nextProcess); 
       } else { 
        this.processes.remove(id); 
       } 
      } 
     } 
    } 
} 
関連する問題