2017-12-22 21 views
1

私はDataframe内で呼び出すUDFを持っていますが、udfは未定義です。 exprで使用するpysparkの未定義関数UDF?

global ac 
ac = sc.accumulator(0) 

def incrementAC(): 
    ac.add(1) 
    return str(ac.value) 

df = sc.parallelize([('Java',90),('Scala',95),('Spark',92)]).toDF(["language","rank"]) 

df.withColumn("lang_and_rank", expr("concat(language,'blah')")).show() 

+--------+----+-------------+ 
|language|rank|lang_and_rank| 
+--------+----+-------------+ 
| Java| 90|  Javablah| 
| Scala| 95| Scalablah| 
| Spark| 92| Sparkblah| 
+--------+----+-------------+ 

myudf = udf(incrementAC,StringType()) 
df.withColumn("lang_and_rank", expr("concat(language,myudf())")).show() 

.utils.AnalysisException: u'undefined function myudf;' 

答えて

2

機能が登録されなければならない。

変換から使用
spark.udf.register("incrementAC", incrementAC) 

またaccumualtorsは信頼できません。

1

from pyspark.sql.functions import udf, expr, concat, col 
from pyspark.sql.types import StringType 

ac = sc.accumulator(0) 

def incrementAC(): 
    ac.add(1) 
    return str(ac) 

#sample data 
df = sc.parallelize([('Java',90),('Scala',95),('Spark',92)]).toDF(["language","rank"]) 

アプローチ1:

#solution using usual udf definition 
myudf = udf(incrementAC, StringType()) 
df.withColumn("lang_and_rank", concat(col('language'), myudf())).show() 

アプローチ2:

#another solution if you want to use 'expr' (as rightly pointed out by @user9132725) 
sqlContext.udf.register("myudf", incrementAC, StringType()) 
df = df.withColumn("lang_and_rank", expr("concat(language, myudf())")) 
df.show() 

出力は:

+--------+----+-------------+ 
|language|rank|lang_and_rank| 
+--------+----+-------------+ 
| Java| 90|  Java1| 
| Scala| 95|  Scala1| 
| Spark| 92|  Spark2| 
+--------+----+-------------+