をカスタムデータフレームクラスを使用している間、私はスカラ座/スパーク(1.5)、ツェッペリンとの奇妙な問題に直面していますタスク直列化可能ではない:Scalaの
私は、次のスカラ座/スパークコードを実行した場合、それが適切に実行されます:
// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a =>
val aa = testList(0)
None}
しかしhere
//DATAFRAME EXTENSION
import org.apache.spark.sql.DataFrame
object ExtraDataFrameOperations {
implicit class DFWithExtraOperations(df : DataFrame) {
//drop several columns
def drop(colToDrop:Seq[String]):DataFrame = {
var df_temp = df
colToDrop.foreach{ case (f: String) =>
df_temp = df_temp.drop(f)//can be improved with Spark 2.0
}
df_temp
}
}
}
を提案したように、カスタムデータフレームの種類を宣言し、次のような例のためにそれを使用した後:
//READ ALL THE FILES INTO different DF and save into map
import ExtraDataFrameOperations._
val filename = "myInput.csv"
val delimiter = ","
val colToIgnore = Seq("c_9", "c_10")
val inputICFfolder = "hdfs:///group/project/TestSpark/"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only
.option("delimiter", delimiter)
.option("charset", "UTF-8")
.load(inputICFfolder + filename)
.drop(colToIgnore)//call the customize dataframe
これは正常に実行されます。
RDD:org.apache.spark.rdd.RDD [INT] Iは、再び(同上)以下のコード
// TEST NO PROBLEM SERIALIZATION val rdd = sc.parallelize(Seq(1, 2, 3)) val testList = List[String]("a", "b") rdd.map{a => val aa = testList(0) None}
を実行する場合
は今はエラー・メッセージを取得します= 0の並列リストでのParallelCollectionRDD [8] での並列化:testList:List [String] = List(a、b) org.apache.spark.SparkException:タスクがシリアライズできない: org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala:304) org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala :122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:314) .. 。 原因:java.io.NotSerializableException: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $ シリアライゼーションスタック: - 直列化できないオブジェクトクラス: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $、 値: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperatio ns $ @ 6c7e70e) - フィールド(クラス:$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC、名前:ExtraDataFrameOperations $モジュール、タイプ:クラス $ iwC iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $) - オブジェクト(クラス$ $$ iwC $$ iwC $$ iwC、$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC @ 4c6d0802) - フィールド(クラス: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC、名前:$ iw、種類:クラス $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC $$ IWC) は...
私は理解していない:
- データフレーム上で操作が実行されていないときに、このエラーが発生したのはなぜですか?
- 「ExtraDataFrameOperations」が正常に使用されていたときにシリアル化できないのはなぜですか?
UPDATE:
が解決しない
@inline val testList = List[String]("a", "b")
にしようとしています。
残念ながら_ @ inline_は役に立ちません – user2573552
他のオブジェクトに関数/データを格納することは、データフレームオブジェクトをカスタマイズする戦略に実際に適合しません – user2573552