SparkのDataFramesで作業する場合、列のデータをマッピングするにはUser Defined Functions(UDF)が必要です。 UDFでは、引数タイプを明示的に指定する必要があります。私の場合は、オブジェクトの配列からなる列を操作する必要があります。使用する型はわかりません。ここでは例です:Spark DataFrameのオブジェクトの配列を受け入れるUDFを定義しますか?
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
それは
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
列のデータに対して基本的な操作を実行するためにorg.apache.spark.sql.functions
を内蔵し、それが任意の操作を実行するために、カスタムUDFを書くために、一般的に簡単ですを使用するのは比較的簡単です
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
しかし、 "科目"列のオブジェクトの配列を操作するためにUDFを使用したいのですが? UDFの引数にはどのような型を使用しますか?私が代わりに火花が提供するものを使用しての、サイズの機能を再実装する場合たとえば、:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
明らかにArray[Something]
は動作しません...私はどのようなタイプを使用する必要があります!私はArray[]
を完全に捨てるべきですか?私の言うところによると、scala.collection.mutable.WrappedArray
には何か関係があるかもしれませんが、それでも私が提供する必要のあるタイプがあります。あなたが探していることはSeq[o.a.s.sql.Row]
ある
私はこれを取得: java.lang.UnsupportedOperationExceptionが:型org.apache.spark.sql.Rowのスキーマがorg.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflectionで サポートされていません。 (ScalaReflection.scala:671) at org.apache.spark.sql.functions $ .udf(functions.scala:3076) 。 .. 134 elided –
@GuruprasadGV UDFは 'struct'の' Product'( 'TupleN'、caseクラス)を返さなければなりません。 – zero323