2016-03-02 8 views
5

私はかなりのデータセットを持っていますが、遅いがクリーンなメソッドを呼び出し、最初のメソッドの結果に副作用を伴うファーストメソッドを呼び出したい。私は中間結果に興味がないので、私はそれらを収集したくないです。並列ストリームを連続して呼び出すと、すべての以前の操作が順次行われます。

明白な解決策は、パラレルストリームを作成し、スローコールを行い、ストリームを再びシーケンシャルにし、高速コールを行うことです。問題は、すべてのコードが単一スレッドで実行されていることです。実際の並列処理はありません。

例コード:

@Test 
public void testParallelStream() throws ExecutionException, InterruptedException 
{ 
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); 
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed() 
      .parallel() 
      .map(this::slowOperation) 
      .sequential() 
      .map(Function.identity())//some fast operation, but must be in single thread 
      .collect(Collectors.toSet()) 
    ).get(); 
    System.out.println(threads); 
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size()); 
} 

private String slowOperation(int value) 
{ 
    try 
    { 
     Thread.sleep(100); 
    } 
    catch (InterruptedException e) 
    { 
     e.printStackTrace(); 
    } 
    return Thread.currentThread().getName(); 
} 

Iはsequentialを削除する場合、予想されるように実行されるコードは、しかし、明らかに、非並列動作は、複数のスレッドで呼び出すであろう。

一時的な収集を避けるために、このような動作についての参考資料や、何らかの方法をお勧めしますか?

答えて

5

は、最初のストリームAPIの設計で働いていたが、多くの問題を引き起こし、最終的に実装がchangedだったので、それだけで全体のパイプラインのためのオンとオフの並列フラグをオンにします。現在のドキュメントは、実際曖昧であり、それはJava-9に改善された:

ストリームパイプラインは、端末操作が起動されたストリームのモードに応じて、順次又は並行して実行されます。ストリームのシーケンシャルまたはパラレルモードはBaseStream.isParallel()メソッドで確認でき、ストリームのモードはBaseStream.sequential()およびBaseStream.parallel()操作で変更できます。最新のシーケンシャルモードまたはパラレルモードの設定は、ストリームパイプライン全体の実行に適用されます。あなたの問題については

、あなたは中間Listにすべてを収集し、新しいシーケンシャルパイプラインを開始することができます:返信用

new Random().ints(100).boxed() 
     .parallel() 
     .map(this::slowOperation) 
     .collect(Collectors.toList()) 
     // Start new stream here 
     .stream() 
     .map(Function.identity())//some fast operation, but must be in single thread 
     .collect(Collectors.toSet()); 
+1

あなたが引用した文章は、Java 8のバージョンとまったく同じです。クラスドキュメントの最後の段落と同じ場所にあります。一般に、詳細は、[パッケージのドキュメント](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps)を参照してください(「パラレル化」を参照) )(パラレル/シーケンシャル・モードでのみならず(「特定の方法」(https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#parallel--)よりも)たとえば、削減と比較してください)。 – Holger

+0

よく目撃された!私はそれが[更新された]ことを知っていた(http://hg.openjdk.java.net/jdk9/dev/jdk/rev/d52b2d49bf04)(私は議論に参加していて[確信して](http://mail.openjdk。 java.net/pipermail/core-libs-dev/2015-August/034773.html)Stuartは 'concat'のために特別な注釈を追加しましたが、なんらかの理由で間違った場所を発見しました。投稿が編集されました。 –

1

現在の実装では、Streamはすべてパラレルまたはすべてシーケンシャルです。 Javadocはこれについて明示的ではなく、今後変更される可能性がありますが、これは可能です。

Sと平行()

は平行である等価ストリームを返します。ストリームがすでに平行であったか、または基本ストリーム状態が並列に変更されたために、自身を返すことがあります。

この機能をシングルスレッドにする必要がある場合は、ロックまたは同期ブロック/メソッドを使用することをお勧めします。 sequential()parallel()からのストリームを切り替え

+0

おかげで、しかし、同期方法がボトルネックと中間コレクション(JMHによって確認)より高速に動作となります。この特定のケースでは、私はパフォーマンスとメモリにもっと関心があります。 – the20login

関連する問題