デフォルトでは、Javaストリームはデフォルトのパラメータで構築されたcommon thread poolによって処理されます。 another questionで回答されているように、カスタムプールを指定するか、java.util.concurrent.ForkJoinPool.common.parallelism
システムパラメータを設定することによって、これらのデフォルトを調整できます。デフォルトのスレッド数を超えるJavaストリームをどのように処理できますか?
しかし、これら2つの方法のいずれかによってストリーム処理に割り当てられるスレッドの数を増やすことができませんでした。例として、最初の引数で指定されたファイルに含まれるIPアドレスのリストを処理し、解決されたアドレスを出力する以下のプログラムを考えてみましょう。これを約13000の一意のIPアドレスを持つファイルで実行すると、Java Java Mission Controlわずか16個のスレッドしか使用できません。これらのうち、わずか5人が労働者です(ForkJoinPool
)。しかし、この特定のタスクは、スレッドがDNS応答を待つ時間をほとんど費やすため、さらに多くのスレッドから利益を得るでしょう。だから私の質問は、どのように実際に使用されるスレッドの数を増やすことができますですか?
私は3つの環境でプログラムを試しました。これらはOSが報告したスレッド数です。 17件のスレッド
- のJava SE Runtime EnvironmentのWindows 7が実行されている8コアマシン上1.8.0_73-B02を構築ダーウィン15.2.0:FreeBSDの11.0を実行している24コアマシン上の23件のスレッド
- OpenJDKはバージョン1.8.0_72:44件のスレッド
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;
/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
/** Resolve the passed IP address into a name */
static String addressName(String ipAddress) {
try {
return InetAddress.getByName(ipAddress).getHostName();
} catch (UnknownHostException e) {
return ipAddress;
}
}
public static void main(String[] args) {
Path path = Paths.get(args[0]);
ForkJoinPool fjp = new ForkJoinPool(100);
try {
fjp.submit(() -> {
try {
Files.lines(path)
.parallel()
.map(line -> addressName(line))
.forEach(System.out::println);
} catch (IOException e) {
System.err.println("Failed: " + e);
}
}).get();
} catch (Exception e) {
System.err.println("Failed: " + e);
}
}
}
try-with-resourcesステートメントに 'Files.lines()'を囲む必要があります。 – fge
これをparallel()しようとする前に、リストに行を追加することをお勧めします。あらかじめいくつのエントリーがあるかを知っていると、はるかに良い仕事をします。 –