2016-12-30 1 views

答えて

3

sortByは、(JVM)またはパーティション機能(Python)に依存するsortByKeyを使用して実装されています。 sortBy/sortByKeyと呼ぶと、パーティショナー(パーティショニング機能)は熱心に初期化され、入力RDDをサンプリングしてパーティション境界を計算します。あなたが見る仕事はこのプロセスに対応しています。

実際に並べ替えは、新しく作成されたRDDまたはその子孫に対してアクションを実行する場合にのみ実行されます。

1

Sparkのドキュメントによると、アクションはSparkでジョブをトリガするだけで、アクションはそのアクションが呼び出されるときに遅延評価されます。

は、一般的にはあなたは正しいが、あなたがちょうど経験してきたように、いくつかの例外があるとsortByは(zipWithIndexで)それらの間です。

実際には、SparkのJIRAで報告され、Will not Fixの解決で閉じられました。 SPARK-1021 sortByKey() launches a cluster job when it shouldn'tを参照してください。

あなたは仕事が(後でウェブUIにと)DAGSchedulerログを有効にして実行している見ることができます:

scala> sc.parallelize(0 to 8).sortBy(identity) 
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions 
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25) 
INFO DAGScheduler: Parents of final stage: List() 
INFO DAGScheduler: Missing parents: List() 
DEBUG DAGScheduler: submitStage(ResultStage 1) 
DEBUG DAGScheduler: missing: List() 
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents 
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1) 
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25) 
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4) 
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s 
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0 
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s 
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25 
関連する問題