2

データベースから2つの並列読み取りを実装しました。 最初の実装では、ExecutorServicenewCachedThreadPool()コンストラクタと先物を使用しています。私は単に、読んでいるケースごとに将来を返すコールを行い、すべての呼び出しを行った後、get()を呼び出します。この実装は正常に機能し、十分に高速です。Java並列ストリームは、newCachedThreadPool()を使用するときに最適なスレッド数を使用しません。

第2の実装では、並列ストリームを使用しています。私は並列ストリームコールを同じExecutorServiceプールに入れるとほとんど5倍遅くなりますと私は望むほど多くのスレッドを使用していないようです。私は代わりにForkJoinPool pool = new ForkJoinPool(50)にそれを入れると、以前の実装と同じくらい速く動作します。

私の質問は:

なぜアンダー利用newCachedThreadPoolバージョンでスレッドを並列ストリーム?これはForkJoinTask.forkが実装されているどのように関係している

private static final ExecutorService pool = Executors.newCachedThreadPool(); 

final List<AbstractMap.SimpleImmutableEntry<String, String>> simpleImmutableEntryStream = 
       personIdList.stream().flatMap(
         personId -> movieIdList.stream().map(
           movieId -> new AbstractMap.SimpleImmutableEntry<>(personId, movieId))).collect(Collectors.toList()); 

final Future<Map<String, List<Summary>>> futureMovieSummaryForPerson = pool.submit(() -> { 
     final Stream<Summary> summaryStream = simpleImmutableEntryStream.parallelStream().map(
      inputPair -> { 
        return FeedbackDao.find(inputPair.getKey(), inputPair.getValue()); 
      }).filter(Objects::nonNull); 
return summaryStream.collect(Collectors.groupingBy(Summary::getPersonId)); 
}); 
+0

あなたのコード –

+0

を表示してください:「ForkJoinPoolプール=新しいForkJoinPool(50);」それは再び速く働く。 –

+0

あなたのコードによれば、あなたはタスクだけを提出しますか?または私は何かが恋しい? –

答えて

2

は、現在のスレッドが来る場合:ここで

は、第2の実施のためのコード(私は1つが、とにかくOK作品引き起こし、最初の実装を掲示するわけではない)であります ForkJoinPoolから同じプールを使用して新しいタスクをサブミットしますが、そうでない場合はローカルマシンの総プロセッサー量の共通プールを使用します。ここで Executors.newCachedThreadPool()でプールを作成すると、このプールによって作成されるスレッドは共通プールを使用するように ForkJoinPoolからのものとして認識されません。ここで

は、それがどのように実装されるか、それはあなたがよりよく理解するのに役立つはずです。

public final ForkJoinTask<V> fork() { 
    Thread t; 
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 
     ((ForkJoinWorkerThread)t).workQueue.push(this); 
    else 
     ForkJoinPool.common.externalPush(this); 
    return this; 
} 

それが下で、共通のプールを使用しますForkJoinWorkerThreadようなタイプではありませんプールExecutors.newCachedThreadPool()によって作成されたスレッド新しいタスクをサブミットするためにプールサイズを最適化しました。私はとのコードの最初の行交換するとき

+0

この実装のコードを追加しました。ご覧のように、私はparallelstreamのデフォルトプールを使用しているのではなく、別々に作成されたExecutorService内で柔軟なスレッド数で実行します。 –

+0

リンクのためのThx、それは実際にあなたに答えを与える –

+0

私は見る。あなたが正しいです。これは、 "無制限"プールで並列ストリームを実行できないということですか? ForkJoinPoolにプールをキャッシュする方法があるかどうか知っていますか? –

関連する問題