2017-08-20 10 views
2

私は、スケーラFuture [T]型の非同期計算パフォーマンスをテストするマージソートを作成しました。Scala Future [T]オーバーヘッド?

私は4コアのCPUを持っていますので、私は完全なCPU能力(サブタスクのサイズは同じであるため、ストール時間は小さくすべきです)を使用しているため、ただし、結果は、非同期マージソートが通常のマージソートより遅いことを示しています。

それは私がひどい同時性を書いているのでしょうか、それともFuture [T]オーバーヘッドのためですか?誰も私がこれを説明するのを助けることができますか?

package kai.concurrent 

import scala.concurrent.duration.Duration 
import scala.concurrent.{Await, Future} 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.Random 

object MergeSort { 
    lazy val regressThreadhold = 10000 

    def mergeSortedList[T](a: Seq[T], b: Seq[T])(implicit ord: Ordering[T]): Seq[T] = { 
    def loop(a: Seq[T], b: Seq[T], acc: Seq[T]): Seq[T] = { 
     if (a.isEmpty && b.isEmpty) acc 
     else if (a.isEmpty) b.reverse ++: acc 
     else if (b.isEmpty) a.reverse ++: acc 
     else if (ord.lt(a.head, b.head)) loop(a.tail, b, a.head +: acc) 
     else loop(a, b.tail, b.head +: acc) 
    } 

    loop(a, b, Seq()).reverse 
    } 

    def mergeSortAsync0[T](x: Seq[T])(implicit ord: Ordering[T]): Future[Seq[T]] = 
    if (x.size <= regressThreadhold) Future(mergeSort(x)) else { 
     val (left, right) = x.splitAt(x.size/2) 
     val Seq(leftSorted, rightSorted) = Seq(left, right).map(seq => Future(mergeSortAsync0(seq)).flatten) 
     leftSorted.zip(rightSorted).map(pair => mergeSortedList(pair._1, pair._2)) 
    } 

    def mergeSortAsync[T](x: Seq[T])(implicit ord: Ordering[T]): Seq[T] = 
    Await.result(mergeSortAsync0(x), Duration.Inf) 

    def mergeSort[T](x: Seq[T])(implicit ord: Ordering[T]): Seq[T] = 
    if (x.size <= 1) x else { 
     val (left, right) = x.splitAt(x.size/2) 
     val (leftSorted, rightSorted) = (mergeSort(left), mergeSort(right)) 
     mergeSortedList(leftSorted, rightSorted) 
    } 
} 

object MergeSortTest extends App { 

    import kai.util.ProfileUtil.TimeResult 

    val seq: Vector[Double] = (1 to 1000000).map(i => Random.nextDouble()).toVector 
    val seqMergeSortAsync = MergeSort.mergeSortAsync(seq) withWallTimePrinted "mergeSortAsync" 
    val seqMergeSort = MergeSort.mergeSort(seq) withWallTimePrinted "mergeSort" 
    val seqSort = seq.sorted withWallTimePrinted "sorted" 
    println(seqSort == seqMergeSort && seqMergeSort == seqMergeSortAsync) 
} 

出力:

mergeSortAsync elapsed time: 3186 ms 

mergeSort elapsed time: 3300 ms 

sorted elapsed time: 581 ms 

true 
+0

は、あなたが複数の呼び出しを超える時間を平均ますか? JVMがウォーミングされないことがあります。また、コードがすべてのコアに当たっていることを確認してください。たとえば、Mac OS Xのアクティビティモニタを見てください。 –

答えて

4

私はあなたのテストをコピーし、JMH(sbt-jmhを使用)を介して、それを実行しました。私は、テストで基礎となる実行コンテキストにあらかじめ定義されたscala.concurrent.ExecutionContext.Implicits.globalを使用しました。

結果:

[info] Benchmark       Mode Cnt Score Error Units 
[info] MergeSortTest.benchMergeSortAsync avgt 25 1.534 +–’ 0.212 s/op 
[info] MergeSortTest.benchMergeSortSync avgt 25 2.325 +–’ 0.437 s/op 
[info] MergeSortTest.benchScalaSort  avgt 25 0.382 +–’ 0.006 s/op 

あなたはScalaのソートは、シーケンシャルマージソートよりもX6の倍高速である一方、並行バージョンを実行すると、より高速なシーケンシャルバージョンより×1.5程度であることをここで見ることができます。

これらのようなマイクロベンチマークを行う場合、考慮すべき多くの要素があることを覚えておく必要があります。 JMHがJVMランタイムに備わっている微妙な問題をJMHに処理させるのが通常です。

plugins.sbt:

addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.27") 

build.sbt:

enablePlugins(JmhPlugin) 

テストコード:

import java.util.concurrent.TimeUnit 

import org.openjdk.jmh.annotations._ 

import scala.concurrent.duration.Duration 
import scala.concurrent.{Await, Future} 
import scala.util.Random 
import scala.concurrent.ExecutionContext.Implicits.global 

/** 
    * Created by Yuval.Itzchakov on 21/08/2017. 
    */ 
@State(Scope.Thread) 
@Warmup(iterations = 3, time = 1) 
@Measurement(iterations = 5, timeUnit = TimeUnit.MILLISECONDS) 
@BenchmarkMode(Array(Mode.AverageTime)) 
@Fork(5) 
class MergeSortTest { 

    var seq: Seq[Double] = _ 

    @Setup 
    def setup(): Unit = { 
    seq = (1 to 1000000).map(i => Random.nextDouble()).toVector 
    } 

    lazy val regressThreadhold = 10000 

    def mergeSortedList[T](a: Seq[T], b: Seq[T])(implicit ord: Ordering[T]): Seq[T] = { 
    def loop(a: Seq[T], b: Seq[T], acc: Seq[T]): Seq[T] = { 
     if (a.isEmpty && b.isEmpty) acc 
     else if (a.isEmpty) b.reverse ++: acc 
     else if (b.isEmpty) a.reverse ++: acc 
     else if (ord.lt(a.head, b.head)) loop(a.tail, b, a.head +: acc) 
     else loop(a, b.tail, b.head +: acc) 
    } 

    loop(a, b, Seq()).reverse 
    } 

    def mergeSortAsync0[T](x: Seq[T])(implicit ord: Ordering[T]): Future[Seq[T]] = 
    if (x.size <= regressThreadhold) Future(mergeSort(x)) else { 
     val (left, right) = x.splitAt(x.size/2) 
     val Seq(leftSorted, rightSorted) = Seq(left, right).map(seq => Future(mergeSortAsync0(seq)).flatten) 
     leftSorted.zip(rightSorted).map(pair => mergeSortedList(pair._1, pair._2)) 
    } 

    def mergeSortAsync[T](x: Seq[T])(implicit ord: Ordering[T]): Seq[T] = 
    Await.result(mergeSortAsync0(x), Duration.Inf) 

    def mergeSort[T](x: Seq[T])(implicit ord: Ordering[T]): Seq[T] = 
    if (x.size <= 1) x else { 
     val (left, right) = x.splitAt(x.size/2) 
     val (leftSorted, rightSorted) = (mergeSort(left), mergeSort(right)) 
     mergeSortedList(leftSorted, rightSorted) 
    } 

    @Benchmark 
    def benchMergeSortSync(): Unit = { 
    mergeSort(seq) 
    } 

    @Benchmark 
    def benchMergeSortAsync(): Unit = { 
    mergeSortAsync(seq) 
    } 

    @Benchmark 
    def benchScalaSort(): Unit = { 
    seq.sorted 
    } 
} 
+1

ありがとうございます! –

+0

また、asyncはパフォーマンスのためではなく、スケーラビリティのためです:) –

関連する問題