2016-08-17 4 views
13

SparkのDataFramesで作業する場合、列のデータをマッピングするにはUser Defined Functions(UDF)が必要です。 UDFでは、引数タイプを明示的に指定する必要があります。私の場合は、オブジェクトの配列からなる列を操作する必要があります。使用する型はわかりません。ここでは例です:Spark DataFrameのオブジェクトの配列を受け入れるUDFを定義しますか?

import sqlContext.implicits._ 

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects 
val data = sqlContext.read.json(sc.parallelize(Seq(
    """ 
    |{ 
    | "topic" : "pets", 
    | "subjects" : [ 
    | {"type" : "cat", "score" : 10}, 
    | {"type" : "dog", "score" : 1} 
    | ] 
    |} 
    """))) 

それは

import org.apache.spark.sql.functions.size 
data.select($"topic", size($"subjects")).show 

+-----+--------------+ 
|topic|size(subjects)| 
+-----+--------------+ 
| pets|    2| 
+-----+--------------+ 

列のデータに対して基本的な操作を実行するためにorg.apache.spark.sql.functionsを内蔵し、それが任意の操作を実行するために、カスタムUDFを書くために、一般的に簡単ですを使用するのは比較的簡単です

import org.apache.spark.sql.functions.udf 
val enhance = udf { topic : String => topic.toUpperCase() } 
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+ 
|UDF(topic)|size(subjects)| 
+----------+--------------+ 
|  PETS|    2| 
+----------+--------------+ 

しかし、 "科目"列のオブジェクトの配列を操作するためにUDFを使用したいのですが? UDFの引数にはどのような型を使用しますか?私が代わりに火花が提供するものを使用しての、サイズの機能を再実装する場合たとえば、:

val my_size = udf { subjects: Array[Something] => subjects.size } 
data.select($"topic", my_size($"subjects")).show 

明らかにArray[Something]は動作しません...私はどのようなタイプを使用する必要があります!私はArray[]を完全に捨てるべきですか?私の言うところによると、scala.collection.mutable.WrappedArrayには何か関係があるかもしれませんが、それでも私が提供する必要のあるタイプがあります。あなたが探していることはSeq[o.a.s.sql.Row]ある

答えて

16

import org.apache.spark.sql.Row 

val my_size = udf { subjects: Seq[Row] => subjects.size } 

説明:あなたが既に知っているようArrayType

  • 現在の表現は、ある、WrappedArrayのでArrayが動作し、それはありません安全面にとどまる方が良いです。
  • StructTypeのローカルタイプはです。残念ながら、個々のフィールドへのアクセスはタイプセーフではありません。ノート

  • udfに渡されたstruct関数を作成するには、Productタイプ(Tuple*またはcase class)、ないを返すことがあります。
+0

私はこれを取得: java.lang.UnsupportedOperationExceptionが:型org.apache.spark.sql.Rowのスキーマがorg.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflectionで サポートされていません。 (ScalaReflection.scala:671) at org.apache.spark.sql.functions $ .udf(functions.scala:3076) 。 .. 134 elided –

+0

@GuruprasadGV UDFは 'struct'の' Product'( 'TupleN'、caseクラス)を返さなければなりません。 – zero323

関連する問題