2017-03-06 9 views
1

私は最近、local[1]モードでスパークを実行している場合や1つのエグゼキュータと1つのコアでYarnを使用している場合でも、UDF内で並列コンピューティング(たとえば並列コレクションを使用)を追加するとパフォーマンスが大幅に向上することを発見しました。はApache SparkのCPU使用量が制限されていますか?

など。 local[1]モードでは、Spark-Jobsはできるだけ多くのCPUを消費します(つまり、8コアの場合は800%、topを使用して測定)。

私はSpark(または糸)がSparkアプリケーションごとのCPU使用量を制限していると考えたので、これは奇妙に思えますか?

私はそれがなぜであり、それがスパークで並列処理/マルチスレッドを使用することを推奨するのか、スパーク並列化パターンに固執しなければならないのでしょうか?ここ

(1つのインスタンスおよび1つのコアと糸クライアントモードで測定回)で再生する例

case class MyRow(id:Int,data:Seq[Double]) 

// create dataFrame 
val rows = 10 
val points = 10000 
import scala.util.Random.nextDouble 
val data = {1 to rows}.map{i => MyRow(i, Stream.continually(nextDouble()).take(points))} 
val df = sc.parallelize(data).toDF().repartition($"id").cache() 

df.show() // trigger computation and caching 

// some expensive dummy-computation for each array-element 
val expensive = (d:Double) => (1 to 10000).foldLeft(0.0){case(a,b) => a*b}*d 

val serialUDF = udf((in:Seq[Double]) => in.map{expensive}.sum) 
val parallelUDF = udf((in:Seq[Double]) => in.par.map{expensive}.sum) 

df.withColumn("sum",serialUDF($"data")).show() // takes ~ 10 seconds 
df.withColumn("sum",parallelUDF($"data")).show() // takes ~ 2.5 seconds 
+0

糸クラスターで実行している場合、糸構成に基づいてコアの使用を制限できます。 – FaigB

+1

私はSparkがSparkプロセス(つまりタスクスケジューラ)によって作成されるスレッドの数を制限すると考えていますが、タスクに割り当てられたスレッドから新しいスレッドを作成するためにscalaの並列コレクションを停止することはできません。 – jamborta

答えて

1

スパーク直接CPUを限定するものではなく、代わりに、同時スレッドのスパークの数が作成定義します。したがって、ローカル[1]では、基本的に同時に1つのタスクを並列に実行します。 in.par.map {expensive}を実行しているときに、スパークするスレッドを管理していないため、この制限によって処理されないスレッドを作成しています。つまり、sparkに単一のスレッドに限定し、それを知ることなく他のスレッドを作成するように指示しました。

一般に、スパーク操作の中で並列スレッドを実行することはお勧めできません。代わりに、sparkにいくつのスレッドを使用できるかを伝え、並列性のために十分なパーティションがあることを確認する方が良いでしょう。

0

スパークは、CPU使用率の設定 examople

val conf = new SparkConf() 
      .setMaster("local[2]") 
      .setAppName("CountingSheep") 
val sc = new SparkContext(conf) 

変更ローカル[*]それは意志利用あなたのCPUコアのすべてをしています。

関連する問題