Sparkのドキュメントでは、RDDアクションのみがSparkジョブをトリガでき、アクションが呼び出されると変換が遅延評価されます。sortBy変換がSparkジョブをトリガーするのはなぜですか?
変換機能はすぐに適用され、SparkUIでジョブトリガーとして表示されます。どうして?
Sparkのドキュメントでは、RDDアクションのみがSparkジョブをトリガでき、アクションが呼び出されると変換が遅延評価されます。sortBy変換がSparkジョブをトリガーするのはなぜですか?
変換機能はすぐに適用され、SparkUIでジョブトリガーとして表示されます。どうして?
sortBy
は、(JVM)またはパーティション機能(Python)に依存するsortByKey
を使用して実装されています。 sortBy
/sortByKey
と呼ぶと、パーティショナー(パーティショニング機能)は熱心に初期化され、入力RDDをサンプリングしてパーティション境界を計算します。あなたが見る仕事はこのプロセスに対応しています。
実際に並べ替えは、新しく作成されたRDD
またはその子孫に対してアクションを実行する場合にのみ実行されます。
Sparkのドキュメントによると、アクションはSparkでジョブをトリガするだけで、アクションはそのアクションが呼び出されるときに遅延評価されます。
は、一般的にはあなたは正しいが、あなたがちょうど経験してきたように、いくつかの例外があるとsortBy
は(zipWithIndex
で)それらの間です。
実際には、SparkのJIRAで報告され、Will not Fixの解決で閉じられました。 SPARK-1021 sortByKey() launches a cluster job when it shouldn'tを参照してください。
あなたは仕事が(後でウェブUIにと)DAGScheduler
ログを有効にして実行している見ることができます:
scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25