2017-11-28 4 views
0

類似のデータを含む2つのデータソースがあり、それらをスカラスパークコードと比較したいと考えています。現在、私は以下のコードを持っていますが、SparkのUIで、rawData DataFrameが2回作成されていることがわかります。これは、40GBの未加工ファイルを取得しています。これはSparkUIのJob0とJob1のコードで確認できます。データを2回引き出すのを防ぐにはどうすればいいですか?複数のデータフレームで正しくスパークを使用していますか?DataFrameを2回作成するのを防ぐ方法複数のデータフレームを使用する

// Create the sql context. 
    val sqlContext = new SQLContext(context) 

    // Pull the data from database, then filter down to what would be outputting, and finally place it in a DataFrame. 
    val databaseDF: DataFrame = DataFrameUtils.getDataBaseDataInDataFrame.persist(StorageLevel.MEMORY_AND_DISK) 

    // Pull the data from the raw file into a DataFrame. 
/** THIS IS CREATED TWICE, FROM WHAT I SEE IN THE SPARK UI **/ 
    val rawDF: DataFrame = DataFrameUtils.getRawDataInDataFrame(sqlContext, filePath).persist(StorageLevel.MEMORY_AND_DISK) 

    // Grab the counts for the report using the DataFrames and comparing them. 
    val sourceTeacherCountDF: DataFrame = DataFrameUtils.getTeacherCountDF(rawDF).persist(StorageLevel.MEMORY_AND_DISK) 
    val TeacherCoverageCountDF: DataFrame = DataFrameUtils.getTeacherCoverageCountDF(gemDF).persist(StorageLevel.MEMORY_AND_DISK) 
    val classCountDF: DataFrame = DataFrameUtils.getclasstCountDF(gemDF).persist(StorageLevel.MEMORY_AND_DISK) 
    val falseNegativeDF: DataFrame = DataFrameUtils.getFalseNegativeCountDF(rawDF, gemDF).persist(StorageLevel.MEMORY_AND_DISK) 
    val falsePositiveDF: DataFrame = DataFrameUtils.getFalsePositivesCountDF(rawDF, gemDF, sqlContext).persist(StorageLevel.MEMORY_AND_DISK) 


    var report: DataFrame = rawDF.select(CLASS).unionAll(gemDF.select(CLASS)).distinct() 
    report = sqlContext.createDataFrame(report.map{case (Row(class: String)) => 
     Row(iavm, lookupIavmTitle(class), lookupClassNum(class))}, lookupClassDesc) 

    val report: DataFrame = report.join(clasCountDF, Seq(class), "left") 
     .join(teacherCountDF, Seq(class), "left") 
     .join(teacherCoverageCountDF, Seq(class), "left") 
     .join(falseNegativeDF, Seq(class), "left") 
      .join(falsePositiveDF, Seq(class), "left").na.fill(0, report.columns) 

report.write 
     .format("json") 
     .mode("overwrite") 
     .save(outputFileName) 

答えて

0

それが二回作成された理由は、私はあなたがrawDFを持続するが、スパークに固執している見ることがfalseNegetiveDFfalsePositiveDF

を作成するために二回使用されている

val falseNegativeDF: DataFrame = DataFrameUtils.getFalseNegativeCountDF(rawDF, gemDF).persist(StorageLevel.MEMORY_AND_DISK) 
val falsePositiveDF: DataFrame = DataFrameUtils.getFalsePositivesCountDF(rawDF, gemDF, sqlContext).persist(StorageLevel.MEMORY_AND_DISK) 

でもあるということです怠惰な操作。したがって、rawDFを2回生成したくない場合は、rawDFで何らかのアクションを実行して計算を覚える必要がありますrawDF.count

関連する問題