13

consuming句でスローされた例外(たとえば、forEachの処理中)でJava 8の並列ストリームがどのように動作するのですか?たとえば、次のコードを入力します。スローされた例外でJava 8の並列ストリームはどのように動作しますか?

final AtomicBoolean throwException = new AtomicBoolean(true); 
IntStream.range(0, 1000) 
    .parallel() 
    .forEach(i -> { 
     // Throw only on one of the threads. 
     if (throwException.compareAndSet(true, false)) { 
      throw new RuntimeException("One of the tasks threw an exception. Index: " + i); 
     }); 

処理された要素はすぐに停止しますか?既に開始された要素が終了するのを待つか?すべてのストリームが終了するのを待っていますか?例外がスローされた後でストリーム要素を処理し始めますか?

いつ復帰しますか?例外の直後ですか?すべての/要素の一部が消費者によって処理された後?

パラレルストリームが例外をスローした後でDo要素が処理され続けますか? (これが起こった場合を見つけた)。

一般的なルールはありますか?並列ストリーム戻りが早く、私はそれは確定的ではないことがわかったかどうかを判断しようとすると

EDIT(15-11-2016)

@Test 
public void testParallelStreamWithException() { 
    AtomicInteger overallCount = new AtomicInteger(0); 
    AtomicInteger afterExceptionCount = new AtomicInteger(0); 
    AtomicBoolean throwException = new AtomicBoolean(true); 

    try { 
     IntStream.range(0, 1000) 
      .parallel() 
      .forEach(i -> { 
       overallCount.incrementAndGet(); 
       afterExceptionCount.incrementAndGet(); 
       try { 
        System.out.println(i + " Sleeping..."); 
        Thread.sleep(1000); 
        System.out.println(i + " After Sleeping."); 
       } 
       catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       // Throw only on one of the threads and not on main thread. 
       if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) { 
        System.out.println("Throwing exception - " + i); 
        throw new RuntimeException("One of the tasks threw an exception. Index: " + i); 
       } 
      }); 
     Assert.fail("Should not get here."); 
    } 
    catch (Exception e) { 
     System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0."); 
     afterExceptionCount.set(0); 
    } 
    System.out.println("Overall count: " + overallCount.get()); 
    System.out.println("After exception count: " + afterExceptionCount.get()); 
} 

後期リターンないから投げメインスレッドこれにより、例外がスローされた後に多くの新しい要素が処理されました。私のマシンでは、例外がスローされた後に約200個の要素が処理されました。しかし、1000要素すべてが処理されたわけではありません。それでは、ここで何がルールですか?例外がスローされたにもかかわらず、より多くの要素が処理されたのはなぜですか?

!)記号を削除すると、メインスレッドに例外がスローされるため、早期返品が返されます。すでに開始された要素だけが処理を終了し、新しい要素は処理されませんでした。早く帰ってきたのはこの場合でした。以前の動作と一致しません。

私はここで何が欠けていますか?

答えて

4

ステージの1つで例外がスローされると、他の操作が完了するのを待たずに、例外が呼び出し元に再スローされます。 それがForkJoinPoolがそれをどのように処理するかです。

対照的にfindFirstは並行して実行すると、すべての操作が処理を終了した後でのみ結果を呼び出し元に提示します(結果がすべての操作を終了する前に判明している場合でも)。

言い換えれば、それは早期に戻りますが、実行中のタスクはすべて終了します。これは非常にホルガーの回答(コメント内のリンク)によって説明されるが、ここでいくつかの詳細がある最後のコメント

に答えるために

EDIT。

1)メインスレッドをすべて終了すると、これらのスレッドで処理されるはずのすべてのタスクも強制終了されます。だから数字は1000のタスクと4つのスレッドがあるので実際には250くらいになるはずですが、これは3を返すと思いますか?:1000のタスクは

int result = ForkJoinPool.getCommonPoolParallelism(); 

がある理論的、それぞれが250個のタスクを処理することになって4つのスレッド、その後、あなたは750回の作業が失われた意味それらの3を殺すがあります。 実行するタスクは250個あり、ForkJoinPoolはこれらの250個の左タスクを実行するために3つの新しいスレッドにまたがります。

あなたは(サイズのない流れを作る)このようなあなたのストリームを変更する、試すことができますいくつかのこと:

IntStream.generate(random::nextInt).limit(1000).parallel().forEach 

この時、最初のスプリット指標が不明であるため、エンディングより多くの操作があるだろうと他の戦略によって選択される。これに

if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) { 

if (!Thread.currentThread().getName().equals("main")) { 

あなたは、常に新しいスレッドにより作成されません特定の時点まで、メイン以外のすべてのスレッドを殺すこの時間何も試みることができることは、これを変更でタスクとしてのForkJoinPoolは小さすぎるため、他のスレッドは必要ありません。この場合、より少ないタスクで終了します。

2)2番目の例では、実際にコードが実行されるようにメインスレッドを強制終了すると、実際に他のスレッドが実行されることはありません。変更してください:

} catch (Exception e) { 
     System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0."); 
     afterExceptionCount.set(0); 
    } 

    // give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results. 
    TimeUnit.SECONDS.sleep(60); 

    System.out.println("Overall count: " + overallCount.get()); 
    System.out.println("After exception count: " + afterExceptionCount.get()); 
+0

いくつかのドキュメントに基づいていますか? –

+3

@ AlikElzin-kilaka本当はそうではありませんが、これは文書化されていないと思います。私はこれを覚えていますので、このバグを参照している他の質問を読んでください:https://bugs.openjdk.java.net/browse/JDK-8164690 – Eugene

+2

@ AlikElzin-kilaka [これ](http:// mail.openjdk.java.net/pipermail/core-libs-dev/2016-August/042972.html)のディスカッションスレッドで、Eugeneが指摘しているJBSのバグを引き起こしたcore-libs-devメーリングリストに掲載されました。 –

関連する問題