2017-10-22 9 views
1

私はUDFを作成しようとしていますがスパークUDF - 私はこのコードを実行すると、タスクではないシリアライズ可能な例外

lazy val formattedDF = df.withColumn("result_col", validateudf(df("id"))) 

val validateudf = udf((id: Int) => { 

    if(id == 1){ 
    "ID IS EQUAL TO 1" 
    } 
    else if(id > 1){ 
    validateId(id) 
    } 
    else{ 
    "NO VALID RECORDS" 
    } 
}) 

def validateId(id:Int) : String = { 
    if (id > 2) { 
    "ID IS GREATER THAN 2" 
    } 
    else { 
    "VALID RECORDS" 
    } 
} 

以下Scalaのコードを使用して、私は仕事直列化可能ではない例外を取得しています。

すべてのアイデア?ありがとう。

+0

役立つことを願う - スレッド「メイン」org.apache.spark.SparkExceptionでスタックトレースとなど –

+0

例外:org.apacheでタスク直列化可能ではない \t .spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:298) \t org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:288) – user7693121

+0

フルコードですか?あなたのコードにいくつかの閉鎖があるようです。 UDFを使用する場合、クロージャーに注意する必要があります。 –

答えて

1

udfは、渡された列を直列化および逆シリアル化しなければならないブラックボックスと見なされるため、組み込まれた機能を持つ代替品を使用するまで、udfの使用は推奨されません。 withColumnudf関数を呼び出す

は大丈夫ですが、問題の原因となったudf関数内から別の関数validateIdと呼ばれています。

udf機能は、when組み込み関数を使用するだけで、要件を満たすことができます。

import org.apache.spark.sql.functions._ 
val formattedDF2 = df.withColumn("result_col", when($"id" === 1, lit("ID IS EQUAL TO 1")).otherwise(when($"id" > 2, lit("ID IS GREATER THAN 2")).otherwise(when($"id" > 1, lit("VALID RECORDS")).otherwise(lit("NO VALID RECORDS"))))) 

私たちは例外に関する詳細な情報を必要とする答えが

+0

返信ありがとうございますが、私はvalidateIdメソッド内のidカラムに他にもいくつかのバリデーションを実装する予定です。 – user7693121

+0

@ user7693124あなたはwhenリストに追加することができます:) upvoteとacceptanceのおかげで –

関連する問題