3

私は普通xargsと並行して実行する科学的アプリケーションを持っていますが、このスキームは繰り返しJVM開始コストを招き、キャッシュされたファイルI/OとJITコンパイラを無視します。私はすでにコードをスレッドプールを使用するように改造しましたが、私は出力を保存する方法についていません。単純な非同期I/O:多くのスレッド、1つのファイル

プログラム(つまり、新しいプログラムの1つのスレッド)は2つのファイルを読み込み、処理して結果を標準出力に出力します。現在は、各スレッドに結果の文字列をBlockingQueueに追加させることで、出力を処理しました。 Booleanフラグがtrueである限り、別のスレッドはキューから取り出してファイルに書き込みます。その後、私はawaitTerminationとフラグをfalseに設定し、ファイルを閉じてプログラムを終了させます。

私の解決策はちょっと不思議そうです。これを達成するための最も簡単で最良の方法は何ですか? 多くのスレッドの主結果データを1つのファイルに書き込む方法を教えてください。

たとえば、広範に適用可能な方法の場合、答えはJava固有である必要はありません。

更新

私はポイズンピルとして "STOP" を使用しています。

while (true) { 
    String line = queue.take(); 
    if (line.equals("STOP")) { 
     break; 
    } else { 
     output.write(line); 
    } 
} 
output.close(); 

私は手動でのジョブが終了し、最終的にキューを毒殺し、消費者のスレッドに参加するのを待ち、その後、スレッドプールにジョブを追加し、キューのかかるスレッドを起動します。

+0

このフラグを使用する場合は、静的なブール値フィールドの場合は、必ず「volatile」として宣言してください。それ以外の場合は、フィールドへの読み書きに同期を追加しない限り、動作しません( 'static final AtomicBoolean'も使用できます)。 Anywhere、私はWill Hartungによると "poison pill"スキーマに固執します。 –

答えて

4

これは本当にあなたがそれをやりたい方法です。スレッドが出力をキューに入れてから、ライターに排気させてもらいます。

フラグをチェックするのではなく、作者が完了したことを知るためにキューに「すべて完了」トークンを入れるだけです。そうすれば、帯域外の信号を送る必要はありません。

これは簡単なことですが、よく知られている文字列、列挙型、または単に共有オブジェクトを使用できます。

2

ExecutorServiceを使用できます。 タスクを実行し、完了後に文字列を返すようにCallableを送信します。

Callableを送信する場合はFutureが得られます。リストで

次に、Futureを繰り返して、Future#getを呼び出して文字列を取得します。 タスクがまだ完了していない場合は、タスクが完了するまでブロックされ、そうでない場合はすぐに値が返されます。

例:

ExecutorService exec = Executors.newFixedThreadPool(10); 
List<Future<String>> tasks = new ArrayList<Future<String>>(); 
tasks.add(exec.submit(new Callable<String> { 
    public String call() { 
     //do stuff 
     return <yourString>; 
    } 
})); 

//and so on for the other tasks 

for (Future<String> task : tasks) { 
    String result = task.get(); 
    //write to output 
} 
+0

awaitTermination()は、すべてのタスクがシャットダウン要求後に*実行を完了するまでブロックします。* "したがって、すべてのタスクをサブミットした後にshutdown()を呼び出す必要があります。しかし、Future.get()もブロックするので、awaitTermination()の呼び出しは必要ありません。 –

+0

あなたが正しいです、シャットダウンと待つ必要はありません。ありがとう! – emboss

1

多くのスレッド処理、それらの間の1個のスレッドの書き込み、メッセージキューは良い戦略です。解決しなければならない問題は、すべての作業がいつ終了したかを知ることです。これを行う1つの方法は、開始したワーカースレッドの数を数え、その後にどれだけ多くの応答があるかを数えることです。この疑似コードのようなもの:

このアプローチは、ワーカーが実行中にさらに多くの作業項目を見つけることができる場合にも機能します。ワーカーの応答にまだ処理されていない作業を追加してから、ワーカーの数を増やし、いつものようにワーカースレッドを開始します。

各ワーカーが1つのメッセージを返す場合は、JavaのExecutorServiceを使用して結果を返すCallableインスタンスを実行できます。 ExecutorServiceのメソッドは、Callableの作業が終了したときに結果を取得できるFutureインスタンスにアクセスします。

まず、すべてのタスクをExecutorServiceにサブミットしてから、すべてのFuturesをループしてその応答を取得します。そうすれば、先物をチェックする順番で回答を書くことができます。これは、自分の仕事を終える順序と異なる場合があります。レイテンシが重要でない場合、それは問題ではありません。それ以外の場合は、メッセージキュー(上記のとおり)が適している可能性があります。

0

出力ファイルに何らかの定義済みの順序があるかどうか、またはそこにデータをダンプするだけでは不明です。私はそれは順序がないと仮定します。

出力用の書き込み用に余分なスレッドが必要な理由がわかりません。ただ​​ファイルに書き込み、各スレッドの最後にそれを呼び出すメソッド。

0

複数のスレッドが同じファイルに書き込んでいる場合、最も簡単なことは、タスク内のそのファイルに書き込むことです。

final PrintWriter out = 
ExecutorService es = 
for(int i=0;i<tasks;i++) 
    es.submit(new Runnable() { 
     public void run() { 
      performCalculations(); 
      // so only one thread can write to the file at a time. 
      synchornized(out) { 
       writeResults(out); 
      } 
     } 
    }); 
es.shutdown(); 
es.awaitTermination(1, TimeUnit.HOUR); 
out.close(); 
関連する問題