2016-05-24 10 views
0

私は非常に長い計算を実行でき、中断しなければならないプログラムを開発しています(Traveling Salesman Problem)。パフォーマンスを得るために、実行中のマシン上に論理コアと同じ数のスレッドを使用したいと思います。マルチスレッドの中断可能な連続計算を実行する最適な方法は何ですか?

私の問題は、私が扱っている方法が最善であるかどうかはわかりません。各スレッドは、タイムアウトまで最良の計算された解を返さなければなりません。私の並列コードは100行のコードを持っているので、スレッドのメインループでスレッドが何度か中断されたかどうかを調べることは優雅です。瞬間のために

、これを達成するために、私はそのようなことを考えていた:私たちはここに何

int countThreads = Runtime.getRuntime().availableProcessors(); 
List<Solution> solutions = new ArrayList<Solution>(countThreads); 
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(countThreads + 1); 

//Thread that cancel all the other one after the timeout 
executor.schedule(new Runnable() { 
    @Override 
    public void run() { 
     executor.shutdownNow(); 
    } 
}, timeMax, TimeUnit.SECONDS); 

//Threads that compute 
for(int i = 0; i < countThreads; i++) { 
    Solution currentSolution = new Solution(); 
    solutions.add(currentSolution); 

    ThreadedSolutionFinder solutionFinder = 
      new ThreadedSolutionFinder(seed, instance, currentSolution); 
     executor.submit(solutionFinder); 
} 

// The main thread waits 
try { 
    executor.awaitTermination(timeMax, TimeUnit.SECONDS); 
} catch (InterruptedException e) {} 
System.out.println("All tasks terminated"); 

//Iterate over all the found solutions and find the best 
//... 

を、ということがメインスレッド、スレッドごとにインスタンス化する一つの解決策であるとスレッドの引数としてそれを与えますコンストラクタ。これらのスレッドのrun()メソッドは、指定されたソリューションを入力します。

しかし、shutdownNow()コマンドの後のスレッドは、Thread.interrupted()をチェックしてメインスレッドのawaitTermination()が十分に持続しないと実行を続けることができます。これは、ベストを見つけるためにすべてのソリューションを反復処理するときの並行性アクセスを意味します。

私はこのデザインでは納得できません。幾人かのアイディアがありますか?

+0

私はそれにも納得できません。なぜ、 'executor.submit(...)'によって返された先物を集めて、それらが完了するのにXよりも長い時間がかかると取り消すのはなぜですか? –

+0

各スレッドは、中断された場合でもソリューションを返さなければならないからです。計算に数分かかることがあり、プログラムは2秒後に中断することができます。実際、各スレッドはランダムな解を生成しますが(速いですが)、解を最適化するため、このステップは非常に遅いです。 –

+0

中断された場合はどのような結果を返せますか? –

答えて

0

リストにExecutorService.submitによって返さFutureインスタンスを置く:

List<Future<?>> futures = new ArrayList<>(); 
for (int i = 0; i < countThreads; ++i) { 
    ThreadedSolutionFinder solutionFinder = ... 
    futures.add(executor.submit(solutionFinder)); 
} 

その後うまくあなたがで完了したいとき:

long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeMax); 

し、適切な量を待って、先物を反復処理あなたが終わりの時までにあなたを連れて行く時間があります。

for (Future<?> future : futures) { 
    future.cancel(true); 
} 
0

あなたはどちらかを追加する必要があるとしている:(これは完成Future Sには影響しません)、最後に

for (Future<?> future : futures) { 
    if (!future.isDone()) { 
    long timeout = endTime - System.nanoTime(); 
    if (timeout >= 0) { 
     try { 
     future.get(timeout, TimeUnit.NANOSECONDS); 
     } catch (TimeoutException e) { 
     // Handle it. 
     } 
    } 
    } 
} 

をすべてキャンセル:それはそれまでに完了していない場合は、タスクをキャンセル各スレッドの割り込みをチェックするか、アルゴリズムをより小さな部分に分割して実行するコード。あなたのタイムアウトがソフトリミットであり、あなたが行う仕事が小さく、かなり規則的である場合、第2のオプションを実行することは実行可能かもしれません。

あなたはランナブルのステップのそれぞれを持っており、現在のソリューションのそれぞれについて、次の最も近い都市を見つけることができるTSPの素朴な実装を想像:

CompletionService<PartialSolution> comp = new ExecutorCompletionService<>(executor); 

// Submit initial runners 
comp.submit(new PartialSolutionTask()); 
... 

long current = System.currentTimeMillis(); 
// Only wait 30 total seconds 
final long end = current + TimeUnit.SECONDS.toMillis(30); 

while (current < end) { 
    Future<PartialSolution> f = comp.poll(end - current, TimeUnit.MILLISECONDS); 
    if (f == null) { 
     // Timeout 
     break; 
    } 

    // Submit a new task using the results of the previous task 
    comp.submit(new PartialSolutionTask(f.get())); 

    current = System.currentTimeMillis(); 

} 
// Still should have the same number of running tasks so wait for them to finish by calling comp.poll() N times 

は、残念ながら、あなたは、時間の制限を超えていくつかに行くつもりですそれらの最終走行が終了するのを待つが、実行時間が十分に短い場合は実行可能でなければならない。

0

実は、私は(あなたよりどちらが悪化したり、より良いと思う)別の解決策を見つけた:ThreadedSolutionFinderで

public Solution executeThreads() { 
    List<Solution> solutions = new ArrayList<Solution>(countThreads); 
    List<Callable<Object>> threads = new ArrayList<Callable<Object>>(countThreads); 
    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(countThreads + 1); 

    //Thread that cancel all the other one after the timeout 
    executor.schedule(new Runnable() { 
     public void run() { 
      executor.shutdownNow(); 
     } 
    }, timeMax, TimeUnit.SECONDS); 

    //Threads that compute 
    for(int i = 0; i < countThreads; i++) { 
     Solution currentSolution = new Solution(); 
     solutions.add(currentSolution); 

     ThreadedSolutionFinder solutionFinder = 
       new ThreadedSolutionFinder(i, instance, currentSolution); 

     threads.add(Executors.callable(solutionFinder)); 
    } 

    long startTime = System.currentTimeMillis(); 

    /* Execute and then wait that all threads have finished */ 
    try { 
     executor.invokeAll(threads); 

    } catch (InterruptedException e) { 
     e.printStackTrace(); 
     return null; 
    } 

    long time = System.currentTimeMillis() - startTime; 
    System.out.println("---------------------------------------"); 
    System.out.println("All tasks terminated in " + time/1000.f + "s"); 

    //Iterate over all the found solutions 
    //... 

:都市の大きなインスタンス上

@Override 
public void run() { 
    thread = Thread.currentThread(); 
    solution.setOF(Double.MAX_VALUE); //No solution so it has the higher cost 

    do { 
     Solution randomSolution = generateRandomSolution(); 
     Solution localSearchSol = localSearch(randomSolution); 
     System.out.println("[Worker " + workerId + 
       "] " + randomSolution.getOF() + "\t-> " + localSearchSol.getOF()); 

     if(localSearchSol.getOF() < solution.getOF()) { 
      //Copy the elements because the solution reference must not change 
      solution.clear(); 
      solution.addAll(localSearchSol); 
      solution.setOF(localSearchSol.getOF()); 
     } 

    } while(!thread.isInterrupted()); 
} 

private Solution generateRandomSolution() { 
    Solution solution = new Solution(); 

    /* We add all the city (index) to the solution, 
    * no more things are required. TSPCostCalculator 
    * do the trick */ 
    for(int i = 0; i < instance.getN(); i++) { 
     solution.add(i); 
    } 

    //Randomize the solution indices (cities) 
    Collections.shuffle(solution, ThreadLocalRandom.current()); 

    //Compute the efficiency of the solution 
    solution.setOF(TSPCostCalculator.calcOF(instance, solution)); 

    return solution; 
} 

/* Return the best solution among many changed solution 
* (local search algorithm) 
* @param generatedSolution The solution to begin with 
* @return the best solution found with the algorithm, 
* null if no better solution 
*/ 
private Solution localSearch(Solution solution) { 
    boolean continueExploration = true; 
    Solution bestSolution = solution; 

    while(continueExploration && !thread.isInterrupted()) 
    { 
     Solution swapSolution; 

     swapSolution = exploreNeighborhood(bestSolution); 
     //TODO: Solve this, computeSwapCost is inaccurate 
     if((float)swapSolution.getOF() < (float)bestSolution.getOF()) 
      bestSolution = swapSolution; 
     else 
      continueExploration = false; 
    } 

    return bestSolution; 
} 

/* Return the best solution among many changed solution 
* (local search algorithm) 
* @param generatedSolution The solution to begin with 
* @return the best solution found with the algorithm 
*/ 
private Solution exploreNeighborhood(Solution solution) { 
    Solution bestSolution = solution; 
    Solution swapSolution = solution.clone(); 

    for(int i = 0; i < solution.size() && !thread.isInterrupted(); i++) 
    { 
     for(int j = i + 1; j < solution.size() && !thread.isInterrupted(); j++) 
     { 
      double costBefore = swapSolution.getOF(); 

      double relativeCostBefore = computeSwapCost(swapSolution, i, j); 
       swapSolution.swap(i, j); 
       double relativeCostAfter = computeSwapCost(swapSolution, i,  j); 
       double diffCost = relativeCostBefore - relativeCostAfter; 

       swapSolution.setOF(costBefore - diffCost); 

       if(swapSolution.getOF() < bestSolution.getOF()) 
        bestSolution = swapSolution.clone(); 
      } 
     } 

     return bestSolution; 
    } 
} 

(例えば、10k)、プログラムはすぐに停止します。唯一の問題(私たちが話していた設計と関係ないかもしれません)は、4つの物理コア上でさえ、2または3スレッドが最高のTSPソリューションを提供するということです。 2つ以上のスレッドを持つ事実は、すべてのスレッド間で悪い解決策を与えます(非常に奇妙です、私は同期がないので説明できません)。

関連する問題