2017-10-24 5 views
0

並列シーケンスをマッピングすると、スケーラは計画された各計算エレメントに固定CPUコアを割り当ててから、この初期割り当てが変更されないように見えます。 (a .. f)は非常に高速であり、(g .. l)は1時間毎に取る場合Scalaの並列シーケンスをマッピングするときの異種実行時間

List(a, b, c, d, e, f, g, h, i, j, k, l).par.map(someComputation) 

// (Simplified) initial assignment for a 2-cores machine: 
// Core 1: a, b, c, d, e, f 
// Core 2: g, h, i, j, k, l 

問題は、ある、我々はつもりコア2は、その仕事を成し遂げるしようとしている一方で、コア1が6時間アイドル状態になってしまいます。

コアXが自由であると、これらのジョブのいずれかでコアXを養う左未開始ジョブがある場合は、そのようなことを並列計算を行うための方法はありますか?

scala> :paste 

def compute(x: Int) = { 
    if (x > 10) 
    (0 to 10e9.toInt).foreach(n => n + 1) // loads 100% of a core for 4-5 seconds on a typical iMac 
    println(x) 
    x * 2 
} 
// hit Ctrl+D 
compute: (x: Int)Int 

scala> def foo = (0 to 20).toList.par.map(compute) 
foo: scala.collection.parallel.immutable.ParSeq[Int] 

scala> foo 

次に何が起こるかを参照してください、複製ScalaのREPLを起動し、入力する

EDIT

。開始時には、すべてのコアが100%であり、進歩するにつれて、各プロセスが開始される前に割り当てられた独自のコアを持つことがはっきりと分かります。他のコアが現在フリーであっても、コアを変更することはありません。最後に、キュー内の残りのすべてのジョブを1つまたは2つのコアで処理することは稀ではありません(最初の割り当てに依存します).3つ以上の他のコアがアイドル状態になっています。あなたはまた、ForkJoinTask APIを見てみることができます

val tasks = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l") 
val resultFutures: List[Future[Something]] = tasks.map(t => Future { 
    someComputation(t) 
}(ExecutionContext.Implicits.global)) 

// wait for result somehow 
resultFutures.foreach(f => Await.result(f)) 

答えて

1

最も簡単な方法は、先物を使用することです。

1

parのために、それはdefaultTaskSupportを使用して、デフォルトだが、それはExecutionContext.Implicits.global

ForkJoinPoolによって実装このExecutionContextExecutorデフォルトで実装されています、それは、ワークスチールスレッドプール、アイドルスレッドがそう、忙しいスレッドからFutureTasksを盗むですOSすべてのプロセッサを使用する必要があります。

とあなたの例のために、私はあなたが確認すると(多分これが:) REPLのバグである)、これをテストするためにREPLを使用しないようにしようとするtasksupportを上書きしようとすることができると思います。 like:

val par: ParSeq[Int] = (0 to 20).toList.par 
    //par.tasksupport = new ForkJoinTaskSupport(new java.util.concurrent.ForkJoinPool(1)) // test run with one processor 
    //default it will use all processors of OS 
    par.tasksupport = new ForkJoinTaskSupport(new java.util.concurrent.ForkJoinPool()) 
    par.map(compute) 
関連する問題