0
類似のデータを含む2つのデータソースがあり、それらをスカラスパークコードと比較したいと考えています。現在、私は以下のコードを持っていますが、SparkのUIで、rawData DataFrameが2回作成されていることがわかります。これは、40GBの未加工ファイルを取得しています。これはSparkUIのJob0とJob1のコードで確認できます。データを2回引き出すのを防ぐにはどうすればいいですか?複数のデータフレームで正しくスパークを使用していますか?DataFrameを2回作成するのを防ぐ方法複数のデータフレームを使用する
// Create the sql context.
val sqlContext = new SQLContext(context)
// Pull the data from database, then filter down to what would be outputting, and finally place it in a DataFrame.
val databaseDF: DataFrame = DataFrameUtils.getDataBaseDataInDataFrame.persist(StorageLevel.MEMORY_AND_DISK)
// Pull the data from the raw file into a DataFrame.
/** THIS IS CREATED TWICE, FROM WHAT I SEE IN THE SPARK UI **/
val rawDF: DataFrame = DataFrameUtils.getRawDataInDataFrame(sqlContext, filePath).persist(StorageLevel.MEMORY_AND_DISK)
// Grab the counts for the report using the DataFrames and comparing them.
val sourceTeacherCountDF: DataFrame = DataFrameUtils.getTeacherCountDF(rawDF).persist(StorageLevel.MEMORY_AND_DISK)
val TeacherCoverageCountDF: DataFrame = DataFrameUtils.getTeacherCoverageCountDF(gemDF).persist(StorageLevel.MEMORY_AND_DISK)
val classCountDF: DataFrame = DataFrameUtils.getclasstCountDF(gemDF).persist(StorageLevel.MEMORY_AND_DISK)
val falseNegativeDF: DataFrame = DataFrameUtils.getFalseNegativeCountDF(rawDF, gemDF).persist(StorageLevel.MEMORY_AND_DISK)
val falsePositiveDF: DataFrame = DataFrameUtils.getFalsePositivesCountDF(rawDF, gemDF, sqlContext).persist(StorageLevel.MEMORY_AND_DISK)
var report: DataFrame = rawDF.select(CLASS).unionAll(gemDF.select(CLASS)).distinct()
report = sqlContext.createDataFrame(report.map{case (Row(class: String)) =>
Row(iavm, lookupIavmTitle(class), lookupClassNum(class))}, lookupClassDesc)
val report: DataFrame = report.join(clasCountDF, Seq(class), "left")
.join(teacherCountDF, Seq(class), "left")
.join(teacherCoverageCountDF, Seq(class), "left")
.join(falseNegativeDF, Seq(class), "left")
.join(falsePositiveDF, Seq(class), "left").na.fill(0, report.columns)
report.write
.format("json")
.mode("overwrite")
.save(outputFileName)