2017-08-10 13 views
0

私はカスタムudfを持っていて、sparkに登録しています。そのUDFにアクセスしようとすると、error.Unableにアクセスします。sparkでのUDFの使用

このようにしてみました。すべてのヘルプは理解されるであろう

最初(rssi_weightage($ "RSSI")// rssi_weightageが見つからないというエラーでエラーを表示

spark.udf.register("rssi_weightage", FilterMap.rssi_weightage) 
val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(rssi_weightage($"RSSI").as("RSSI_Weight"))) 

+0

私のソリューションはあなたの質問を解決しましたか?はい、答えが –

答えて

2

は、これはあなたがUDFを使用する方法を、実際ではありませんUDFはspark.udf.registerからの戻り値であるあなたがすることができるので:。

val udf_rssii_weightage = spark.udf.register("rssi_weightage", FilterMap.rssi_weightage) 

val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(udf_rssi_weightage($"RSSI")).as("RSSI_Weight")) 

しかし、あなたの場合にはあなただけのReguを変換するorg.apache.spark.sql.functions.udfを使用して、UDFを登録する必要はありませんがUDFにLAR機能:

val udf_rssii_weightage = udf(FilterMap.rssi_weightage) 
+0

の場合はお返事ありがとうございます... –

+0

良い1つ:@ラファエル、upvoteにふさわしい –

1

私は次のスナップショットが発表UDFにわずかに異なるアプローチを持っている 、あなたがUDF関数を定義している方法で問題があるとします - それは直接の関数定義されています: インポートorg.apache.spark.sql.functions._

val data = sqlContext.read.json(sc.parallelize(Seq("{'foo' : 'Bar'}", "{'foo': 'Baz'}"))) 

val example = Seq("Bar", "Bazzz") 
val urbf = udf { foo: String => if (example.contains(example)) 1 else 0 } 

data.select($"foo", urbf($"foo")).show 

+--------+-------------+ 
| foo |UDF(foo)  | 
+--------+-------------+ 
| Bar |   1| 
| Bazzz |   0| 
+--------+-------------+ 
+0

あなたの解決のおかげで... –

関連する問題