2017-12-28 37 views
-1

私はUDFを作成しましたが、これを結合内の合体の結果に適用しようとしています。 理想的には私は参加時にこれを実行したいと思います:「タスクはシリアライズできません」という例外が発生したため、UDFによるクエリが失敗するのはなぜですか?

def foo(value: Double): Double = { 
    value/100 
} 

val foo = udf(foo _) 

df.join(.....) 
    .withColumn("value",foo(coalesce(new Column("valueA"), new Column("valueB")))) 

しかし、私は例外Task not serializableを取得しています。 これを回避する方法はありますか?

答えて

1

ラムダ関数を使用してシリアライズ可能にします。この例は正常に動作します。

import org.apache.spark.sql.functions.col 
import org.apache.spark.sql.functions.coalesce 
import org.apache.spark.sql.functions.udf 
val central: DataFrame = Seq(
    (1, Some(2014)), 
    (2, null) 
).toDF("key", "year1") 

val other1: DataFrame = Seq(
    (1, 2016), 
    (2, 2015) 
).toDF("key", "year2") 
def fooUDF = udf{v: Double => v/100} 

val result = central.join(other1, Seq("key")) 
    .withColumn("value",fooUDF(coalesce(col("year1"), col("year2")))) 
0

しかし、私は例外Task not serializableを取得しています。

悪名高い「タスクシリアライズしない」例外理由はdef foo(value: Double): Doubleは(間接的unserializable SparkContextを参照おそらくSparkSession有する)unserializable所有オブジェクトの一部であるということです。

解決策は、シリアル化できない値への参照を持たない「スタンドアロン」オブジェクトの一部としてメソッドを定義することです。

これを回避する方法はありますか?

@firasでother answerを参照してください。

関連する問題