私はUDFを作成しようとしていますがスパークUDF - 私はこのコードを実行すると、タスクではないシリアライズ可能な例外
lazy val formattedDF = df.withColumn("result_col", validateudf(df("id")))
val validateudf = udf((id: Int) => {
if(id == 1){
"ID IS EQUAL TO 1"
}
else if(id > 1){
validateId(id)
}
else{
"NO VALID RECORDS"
}
})
def validateId(id:Int) : String = {
if (id > 2) {
"ID IS GREATER THAN 2"
}
else {
"VALID RECORDS"
}
}
以下Scalaのコードを使用して、私は仕事直列化可能ではない例外を取得しています。
すべてのアイデア?ありがとう。
役立つことを願う - スレッド「メイン」org.apache.spark.SparkExceptionでスタックトレースとなど –
例外:org.apacheでタスク直列化可能ではない \t .spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:298) \t org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:288) – user7693121
フルコードですか?あなたのコードにいくつかの閉鎖があるようです。 UDFを使用する場合、クロージャーに注意する必要があります。 –