2016-02-23 12 views
6

デフォルトでは、Javaストリームはデフォルトのパラメータで構築されたcommon thread poolによって処理されます。 another questionで回答されているように、カスタムプールを指定するか、java.util.concurrent.ForkJoinPool.common.parallelismシステムパラメータを設定することによって、これらのデフォルトを調整できます。デフォルトのスレッド数を超えるJavaストリームをどのように処理できますか?

しかし、これら2つの方法のいずれかによってストリーム処理に割り当てられるスレッドの数を増やすことができませんでした。例として、最初の引数で指定されたファイルに含まれるIPアドレスのリストを処理し、解決されたアドレスを出力する以下のプログラムを考えてみましょう。これを約13000の一意のIPアドレスを持つファイルで実行すると、Java Java Mission Controlわずか16個のスレッドしか使用できません。これらのうち、わずか5人が労働者です(ForkJoinPool)。しかし、この特定のタスクは、スレッドがDNS応答を待つ時間をほとんど費やすため、さらに多くのスレッドから利益を得るでしょう。だから私の質問は、どのように実際に使用されるスレッドの数を増やすことができますですか?

私は3つの環境でプログラムを試しました。これらはOSが報告したスレッド数です。 17件のスレッド

  • のJava SE Runtime EnvironmentのOS Xを実行している2コアマシン上1.8.0_66-B17を構築:

    • のJava SE Runtime EnvironmentのWindows 7が実行されている8コアマシン上1.8.0_73-B02を構築ダーウィン15.2.0:FreeBSDの11.0を実行している24コアマシン上の23件のスレッド
    • OpenJDKはバージョン1.8.0_72:44件のスレッド
    
    import java.io.IOException; 
    import java.net.InetAddress; 
    import java.net.UnknownHostException; 
    import java.nio.file.Files; 
    import java.nio.file.Files; 
    import java.nio.file.Path; 
    import java.nio.file.Paths; 
    import java.util.concurrent.ForkJoinPool; 
    
    /** Resolve IP addresses in file args[0] using 100 threads */ 
    public class Resolve100 { 
        /** Resolve the passed IP address into a name */ 
        static String addressName(String ipAddress) { 
         try { 
          return InetAddress.getByName(ipAddress).getHostName(); 
         } catch (UnknownHostException e) { 
          return ipAddress; 
         } 
        } 
    
        public static void main(String[] args) { 
         Path path = Paths.get(args[0]); 
         ForkJoinPool fjp = new ForkJoinPool(100); 
         try { 
          fjp.submit(() -> { 
           try { 
            Files.lines(path) 
            .parallel() 
            .map(line -> addressName(line)) 
            .forEach(System.out::println); 
           } catch (IOException e) { 
            System.err.println("Failed: " + e); 
           } 
          }).get(); 
         } catch (Exception e) { 
          System.err.println("Failed: " + e); 
         } 
        } 
    } 
    
  • +2

    try-with-resourcesステートメントに 'Files.lines()'を囲む必要があります。 – fge

    +2

    これをparallel()しようとする前に、リストに行を追加することをお勧めします。あらかじめいくつのエントリーがあるかを知っていると、はるかに良い仕事をします。 –

    答えて

    6

    あなたのアプローチには2つの問題があります。これがin the following way定義されているカスタムFJPを使用すると、ストリームAPIによって作成された個々のタスクの最大数を変更しないことを第一です:

    static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 
    

    ですから、カスタムプールを使用している場合でも、並列タスクの数が制限されますcommonPoolParallelism * 4。 (それは実際にはハード制限ではなく、ターゲットですが、多くの場合、タスクの数はこの数に等しい)。

    上記の問題はjava.util.concurrent.ForkJoinPool.common.parallelismシステムプロパティを使用して修正できますが、ここで別の問題が発生します。Files.linesは本当にひどく並列化されています。詳細については、this questionを参照してください。特に、13000の入力ラインの場合、最大100のCPUがあっても、可能な最大スピードアップは3.17倍です(すべてのライン処理がほぼ同じ時間を要すると仮定します)。私のStreamExライブラリは、これを回避するために用意されています(StreamEx.ofLines(path).parallel()を使用してストリームを作成してください)。別の可能な解決策は、それから並列ストリームを作成し、順次Listにファイルの行を読み取ることである。

    Files.readAllLines(path).parallelStream()... 
    

    これは、システムプロパティと一緒に働くことになります。しかし、一般的に、ストリームAPIは、タスクがI/Oを伴う場合、並列処理にはあまり適していません。あなたは、システムのプロパティを微調整する必要はありませんし、別のタスクのために別のプールを使用することができます

    ForkJoinPool fjp = new ForkJoinPool(100); 
    List<CompletableFuture<String>> list = Files.lines(path) 
        .map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp)) 
        .collect(Collectors.toList()); 
    list.stream().map(CompletableFuture::join) 
        .forEach(System.out::println); 
    

    この方法:より柔軟なソリューションは、行ごとにCompletableFutureを使用することです。

    +0

    スレッドの数を変更するためのこのテクニックは完全に実装依存で、何も指定されていない振る舞いで、開発者は – Holger

    +0

    @Holgerに依存する必要がありますが、私はあなたが.submitメソッドを意味すると思います。 –

    +0

    ありがとうございます! CompletableFutureアプローチは、実際には100スレッドを生成し、1桁のスピードアップを提供します。ここに数字があります。 オリジナル:48m40.036s; CompletableFuture:0m37.465s。 (元のバージョンはウォームDNSキャッシュ上でも動作します) –

    関連する問題