私は最近、local[1]
モードでスパークを実行している場合や1つのエグゼキュータと1つのコアでYarnを使用している場合でも、UDF内で並列コンピューティング(たとえば並列コレクションを使用)を追加するとパフォーマンスが大幅に向上することを発見しました。はApache SparkのCPU使用量が制限されていますか?
など。 local[1]
モードでは、Spark-Jobsはできるだけ多くのCPUを消費します(つまり、8コアの場合は800%、top
を使用して測定)。
私はSpark(または糸)がSparkアプリケーションごとのCPU使用量を制限していると考えたので、これは奇妙に思えますか?
私はそれがなぜであり、それがスパークで並列処理/マルチスレッドを使用することを推奨するのか、スパーク並列化パターンに固執しなければならないのでしょうか?ここ
(1つのインスタンスおよび1つのコアと糸クライアントモードで測定回)で再生する例
case class MyRow(id:Int,data:Seq[Double])
// create dataFrame
val rows = 10
val points = 10000
import scala.util.Random.nextDouble
val data = {1 to rows}.map{i => MyRow(i, Stream.continually(nextDouble()).take(points))}
val df = sc.parallelize(data).toDF().repartition($"id").cache()
df.show() // trigger computation and caching
// some expensive dummy-computation for each array-element
val expensive = (d:Double) => (1 to 10000).foldLeft(0.0){case(a,b) => a*b}*d
val serialUDF = udf((in:Seq[Double]) => in.map{expensive}.sum)
val parallelUDF = udf((in:Seq[Double]) => in.par.map{expensive}.sum)
df.withColumn("sum",serialUDF($"data")).show() // takes ~ 10 seconds
df.withColumn("sum",parallelUDF($"data")).show() // takes ~ 2.5 seconds
糸クラスターで実行している場合、糸構成に基づいてコアの使用を制限できます。 – FaigB
私はSparkがSparkプロセス(つまりタスクスケジューラ)によって作成されるスレッドの数を制限すると考えていますが、タスクに割り当てられたスレッドから新しいスレッドを作成するためにscalaの並列コレクションを停止することはできません。 – jamborta