2017-06-05 27 views
0

私が持っている以下のUDFの実行に失敗しました:は、ユーザー定義関数

val jac_index:(Array[String],Array[String])=>Float=(Sq1:Array[String],Sq2:Array[String])=> 
{ 
    val Sq3=Sq1.intersect(Sq2) 
    val Sq4=Sq1.union(Sq2).distinct 
    if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F 
} 
val jacUDF=udf(jac_index) 

を、私は次の文を実行したとき

val movie_jac_df=movie_pairs_df.withColumn("jac",jacUDF(movie_pairs_df("name"),movie_pairs_df("name2"))) 

は私が

「ユーザー定義関数の実行に失敗しました」エラーが出ます

movie_pairs_dfのスキーマ

root 
|-- movie: string (nullable = true) 
|-- name: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- movie2: string (nullable = true) 
|-- name2: array (nullable = true) 
| |-- element: string (containsNull = true) 

原因は何ですか?

答えて

1

SparkのDataFramesモデル配列列はmutable.WrappedArrayです。つまり、UDFは入力として2つのWrappedArraysを使用する必要があります。あなたは、2つのこのような配列を期待するjac_indexを変更した場合

は:予想通り

import scala.collection.mutable 

val jac_index: (mutable.WrappedArray[String], mutable.WrappedArray[String]) => Float = 
    (Sq1, Sq2) => { /* same implementation */ } 

これは動作します。

+0

以下のようにUDFを定義する、ありがとう! – leonfrank

0

それが動作

val jacUDF = udf((Sq1:mutable.WrappedArray[String], Sq2:mutable.WrappedArray[String]) => { 
    val Sq3=Sq1.intersect(Sq2) 
    val Sq4=Sq1.union(Sq2).distinct 
    if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F 
}) 
関連する問題