2017-09-10 7 views
0

列を変換したい。新しい列には、元の列のパーティションのみを含める必要があります。元のベクトル列がで作成されたSpark DataFrame UDFパーティショニング列

myDF = myDF.withColumn("measurement_"+i,extract(i)($"vector")) 

して後輪でそれを使用するには

def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(16).toSeq(index)) 

var vectors :Seq[Seq[Double]] = myVectors 
vectors.toDF("vector") 

しかし、最終的に私が得る私は、次のUDFを定義し次のエラー:

Failed to execute user defined function(anonfun$user$sparkapp$MyClass$$extract$2$1: (array<double>) => array<double>) 

udfを間違って定義しましたか?

答えて

1

私は存在しない要素を抽出しようとすると、エラーを再現すなわち、配列の長さよりも大きいインデックスを与えることができます:

val myDF = Seq(Seq(1.0, 2.0 ,3, 4.0), Seq(4.0,3,2,1)).toDF("vector") 
myDF: org.apache.spark.sql.DataFrame = [vector: array<double>] 

def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(2).toSeq(index)) 
// extract: (index: Integer)org.apache.spark.sql.expressions.UserDefinedFunction 

val i = 2 

myDF.withColumn("measurement_"+i,extract(i)($"vector")).show 

は、このエラーを与える:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$extract$1: (array<double>) => array<double>) 

toSeq(index)をしている間、あなたが同じ問題を抱えている

ほとんどの場合、インデックスがバウンド外の場合はNoneを返すtoSeq.lift(index)を使用してみてください

def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(2).toSeq.lift(index)) 
extract: (index: Integer)org.apache.spark.sql.expressions.UserDefinedFunction 

ノーマルインデックス

val i = 1  
myDF.withColumn("measurement_"+i,extract(i)($"vector")).show 
+--------------------+-------------+ 
|    vector|measurement_1| 
+--------------------+-------------+ 
|[1.0, 2.0, 3.0, 4.0]| [3.0, 4.0]| 
|[4.0, 3.0, 2.0, 1.0]| [2.0, 1.0]| 
+--------------------+-------------+ 

インデックスバウンドのうち:

val i = 2 
myDF.withColumn("measurement_"+i,extract(i)($"vector")).show 
+--------------------+-------------+ 
|    vector|measurement_2| 
+--------------------+-------------+ 
|[1.0, 2.0, 3.0, 4.0]|   null| 
|[4.0, 3.0, 2.0, 1.0]|   null| 
+--------------------+-------------+ 
+1

どうもありがとう、このエラーのデバッグは私に多くの時間を要しています。あなたの詳細な答えは+1! – user4054919