2016-08-03 3 views
0

私はHDFSに保存するデータフレームを作成するスパークジョブを持っています。私がしたいのは、そのデータフレームのサブセットを別の場所に保存することですが、私はこれについて演奏したいと思います。複数のデータフレームを保存する

私が持っている唯一の変革は、節約そのものです...スパークジョブのコードのすべての要素がアクションです。私はデータフレームをキャッシュしません。私は、古いものから新しいデータフレームへのドロップアクションの作成は、元のデータフレームの変換をすべて再度受けることに懸念しています。例えば

は、私のようなものがあります:

val df = hiveContext.read.json("hdfs://HOSTNAME:PORT/user/spark/data/in/*") 

val df2 = df.withColumn("new_column", some_udf("old_column")).drop("old_column") 
. 
. 
. 

val final_df = df10.withColumn("newest_column", another_udf("old_column2")).drop("old_column2") 

val subset_df = final_df.drop("this_column") 
         .drop("that_column") 
         .drop("another_column) 

final_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir) 
subset_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir2) 

をしかしsome_udfが実際に本当に集中的に計算されることを前提としています。私はそれを2回実行したくありません。したがって、私の質問は:

私はfinal_df.cache()私はsubset_dfを宣言し、呼び出すべき前に、それは再びUDFの変換を実行しないことを確認して保存しますか?あなたがキャッシュすべき

val df = hiveContext.read.json("hdfs://HOSTNAME:PORT/user/spark/data/in/*") 

val df2 = df.withColumn("new_column", some_udf("old_column")).drop("old_column") 
. 
. 
. 

val final_df = df10.withColumn("newest_column", another_udf("old_column2")).drop("old_column2") 

val subset_df = final_df.drop("this_column") 
         .drop("that_column") 
         .drop("another_column) 

final_df.cache() // This is the only new line 

final_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir) 
subset_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir2) 

答えて

1

:最初のアクションの前に

val final_df = df10.withColumn(...) 
val subset_df = final_df.drop(...) 
final_df.cache() 

を、それ以外の場合は二回(あなたが疑われるとして)実行されるよう

何か。

+0

"Will"が2回実行されるか、 "could"が2回実行されるか?私の印象は後者のことです。 –

+0

@MatthewGraves私はそれがそうなると思います。なぜそれは一度だけ実行されますか? –

+0

私はdf2.cacheを最初のudfの後に置くべきでしょうか?これはまだsaveまたはcacheまで呼び出されないアクションです。あるいは、すべてのアクションの宣言後ではなく、最初の変換を呼び出す前にfinal_df.cache()を行う必要がありますか?言い換えれば、どこにキャッシュする必要がありますか? – tadamhicks

関連する問題