0
私はHDFSに保存するデータフレームを作成するスパークジョブを持っています。私がしたいのは、そのデータフレームのサブセットを別の場所に保存することですが、私はこれについて演奏したいと思います。複数のデータフレームを保存する
私が持っている唯一の変革は、節約そのものです...スパークジョブのコードのすべての要素がアクションです。私はデータフレームをキャッシュしません。私は、古いものから新しいデータフレームへのドロップアクションの作成は、元のデータフレームの変換をすべて再度受けることに懸念しています。例えば
は、私のようなものがあります:
val df = hiveContext.read.json("hdfs://HOSTNAME:PORT/user/spark/data/in/*")
val df2 = df.withColumn("new_column", some_udf("old_column")).drop("old_column")
.
.
.
val final_df = df10.withColumn("newest_column", another_udf("old_column2")).drop("old_column2")
val subset_df = final_df.drop("this_column")
.drop("that_column")
.drop("another_column)
final_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)
subset_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir2)
をしかしsome_udf
が実際に本当に集中的に計算されることを前提としています。私はそれを2回実行したくありません。したがって、私の質問は:
私はfinal_df.cache()
私はsubset_df
を宣言し、呼び出すべき前に、それは再びUDFの変換を実行しないことを確認して保存しますか?あなたがキャッシュすべき
val df = hiveContext.read.json("hdfs://HOSTNAME:PORT/user/spark/data/in/*")
val df2 = df.withColumn("new_column", some_udf("old_column")).drop("old_column")
.
.
.
val final_df = df10.withColumn("newest_column", another_udf("old_column2")).drop("old_column2")
val subset_df = final_df.drop("this_column")
.drop("that_column")
.drop("another_column)
final_df.cache() // This is the only new line
final_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)
subset_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir2)
"Will"が2回実行されるか、 "could"が2回実行されるか?私の印象は後者のことです。 –
@MatthewGraves私はそれがそうなると思います。なぜそれは一度だけ実行されますか? –
私はdf2.cacheを最初のudfの後に置くべきでしょうか?これはまだsaveまたはcacheまで呼び出されないアクションです。あるいは、すべてのアクションの宣言後ではなく、最初の変換を呼び出す前にfinal_df.cache()を行う必要がありますか?言い換えれば、どこにキャッシュする必要がありますか? – tadamhicks