2011-10-11 9 views
34

各行がの実行可能ファイルを生成し、スレッドプールによって実行される7000万行以上の.csvファイルがあります。このRunnableはMysqlにレコードを挿入します。シャットダウン後にスレッドプールを再利用する方法

さらに、RandomAccessFileのcsvファイルの位置を記録します。位置はファイルに書き込まれます。スレッドプール内のすべてのスレッドが完了すると、このレコードを書き込みます。ThreadPoolExecutor.shutdown()が呼び出されます。しかし、より多くの行が来たら、私は再びスレッドプールが必要です。新しいスレッドプールを作成する代わりに、この現在のスレッドプールを再利用するにはどうすればよいですか?次のように

コードは次のとおりです。

public static boolean processPage() throws Exception { 

    long pos = getPosition(); 
    long start = System.currentTimeMillis(); 
    raf.seek(pos); 
    if(pos==0) 
     raf.readLine(); 
    for (int i = 0; i < PAGESIZE; i++) { 
     String lineStr = raf.readLine(); 
     if (lineStr == null) 
      return false; 
     String[] line = lineStr.split(","); 
     final ExperienceLogDO log = CsvExperienceLog.generateLog(line); 
     //System.out.println("userId: "+log.getUserId()%512); 

     pool.execute(new Runnable(){ 
      public void run(){ 
       try { 
        experienceService.insertExperienceLog(log); 
       } catch (BaseException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 

     long end = System.currentTimeMillis(); 
    } 

    BufferedWriter resultWriter = new BufferedWriter(
      new OutputStreamWriter(new FileOutputStream(new File(
        RESULT_FILENAME), true))); 
    resultWriter.write("\n"); 
    resultWriter.write(String.valueOf(raf.getFilePointer())); 
    resultWriter.close(); 
    long time = System.currentTimeMillis()-start; 
    System.out.println(time); 
    return true; 
} 

ありがとう!

答えて

31

に提出し、あなたがシャットダウンされていExecutorServiceを再利用することはできません。私はの回避策を勧めます()。これは、(a)すべての状況で期待どおりに動作しない可能性があります。 (b)標準クラスを使用して、あなたが望むものを達成することができます。

あなたはどちらか

  1. が新しいExecutorServiceをインスタンス化する必要があります。または

  2. ExecutorServiceを終了しません。

最初の解決策は簡単に実装されるため、詳しくは説明しません。

第2の理由は、提出されたタスクがすべて終了した後にアクションを実行したいから、ExecutorCompletionServiceを参照して代わりに使用することがあります。これは、スレッド管理を行いますExecutorServiceをラップし、それはあなたに戻って報告できるようランナブルは、彼らが終了したらExecutorCompletionServiceを教えてくれるものに包まれます:

ExecutorService executor = ...; 
ExecutorCompletionService ecs = new ExecutorCompletionService(executor); 

for (int i = 0; i < totalTasks; i++) { 
    ... ecs.submit(...); ... 
} 

for (int i = 0; i < totalTasks; i++) { 
    ecs.take(); 
} 

方法take()ExecutorCompletionServiceにクラスは、タスクが(通常または突然のどちらかで)終了するまでブロックします。 Futureが返されますので、必要に応じて結果を確認することができます。

私はあなたの問題を完全に理解していないので、これがあなたを助けてくれることを願っています。

+0

強制的にスレッドを停止するか、すべてのタスクをキャンセルしますか? exectuer.shutdownNow()を使用していますか?このラッパーでも正常に動作しますか? –

3

documentationに述べたように作成し、グループのすべての作業をして(すべてのタスクが完了したときにのみ返します)invokeAllとプール

+0

'ExecutorCompletionService'または' invokeAll'を使う方が良いでしょうか? –

1

ExecutorServiceでシャットダウンを呼び出した後、no new Task will be accepted。これにより、各ラウンドのタスクごとに新しいExecutorServiceを作成する必要があります。

ただし、Java 8 ForkJoinPool.awaitQuiescenceが導入されました。通常のExecutorServiceからForkJoinPoolに切り替えることができる場合は、このメソッドを使用して、shutdownを呼び出さずにForkJoinPoolで実行中のタスクがなくなるまで待機できます。これは、ForkJoinPoolをTasksで満たし、空(静止)になるまで待ってからTasksでもう一度埋めていくことを意味します。

関連する問題