2016-07-19 10 views
0
スパークで

をカスタムデータフレームクラスを使用している間、私はスカラ座/スパーク(1.5)、ツェッペリンとの奇妙な問題に直面していますタスク直列化可能ではない:Scalaの

私は、次のスカラ座/スパークコードを実行した場合、それが適切に実行されます:

// TEST NO PROBLEM SERIALIZATION 
val rdd = sc.parallelize(Seq(1, 2, 3)) 
val testList = List[String]("a", "b") 

rdd.map{a => 
    val aa = testList(0) 
    None} 

しかしhere

//DATAFRAME EXTENSION 
import org.apache.spark.sql.DataFrame 

object ExtraDataFrameOperations { 
    implicit class DFWithExtraOperations(df : DataFrame) { 

    //drop several columns 
    def drop(colToDrop:Seq[String]):DataFrame = { 
     var df_temp = df 
     colToDrop.foreach{ case (f: String) => 
      df_temp = df_temp.drop(f)//can be improved with Spark 2.0 
     } 
     df_temp 
    } 
    } 
} 

を提案したように、カスタムデータフレームの種類を宣言し、次のような例のためにそれを使用した後:

//READ ALL THE FILES INTO different DF and save into map 
import ExtraDataFrameOperations._ 
val filename = "myInput.csv" 

val delimiter = "," 

val colToIgnore = Seq("c_9", "c_10") 

val inputICFfolder = "hdfs:///group/project/TestSpark/" 

val df = sqlContext.read 
      .format("com.databricks.spark.csv") 
      .option("header", "true") // Use first line of all files as header 
      .option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only 
      .option("delimiter", delimiter) 
      .option("charset", "UTF-8") 
      .load(inputICFfolder + filename) 
      .drop(colToIgnore)//call the customize dataframe 

これは正常に実行されます。

RDD:org.apache.spark.rdd.RDD [INT] Iは、再び(同上)以下のコード

// TEST NO PROBLEM SERIALIZATION 
val rdd = sc.parallelize(Seq(1, 2, 3)) 
val testList = List[String]("a", "b") 
rdd.map{a => 
    val aa = testList(0) 
    None} 

を実行する場合

は今はエラー・メッセージを取得します= 0の並列リストでのParallelCollectionRDD [8] での並列化:testList:List [String] = List(a、b) org.apache.spark.SparkException:タスクがシリアライズできない: org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala:304) org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala :122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:314) .. 。 原因:java.io.NotSerializableException: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $ シリアライゼーションスタック: - 直列化できないオブジェクトクラス: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $、 値: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperatio ns $ @ 6c7e70e) - フィールド(クラス:$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC、名前:ExtraDataFrameOperations $モジュール、タイプ:クラス $ iwC iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $) - オブジェクト(クラス$ $$ iwC $$ iwC $$ iwC、$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC @ 4c6d0802) - フィールド(クラス: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC、名前:$ iw、種類:クラス $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC $$ IWC) は...

私は理解していない:

  • データフレーム上で操作が実行されていないときに、このエラーが発生したのはなぜですか?
  • 「ExtraDataFrameOperations」が正常に使用されていたときにシリアル化できないのはなぜですか?

UPDATE:

が解決しない

@inline val testList = List[String]("a", "b") 

にしようとしています。

答えて

0

スパークがtestListの周りのすべての範囲をシリアル化しようとしているようです。 データ@inline val testList = List[String]("a", "b")をインライン化するか、ドライバに渡す関数/データを格納する別のオブジェクトを使用してください。

+0

残念ながら_ @ inline_は役に立ちません – user2573552

+0

他のオブジェクトに関数/データを格納することは、データフレームオブジェクトをカスタマイズする戦略に実際に適合しません – user2573552