スパーク2.1+
あなたはSQLContext.registerJavaFunction
を使用することができます:それはSQL文で使用できるように
がJava UDFを登録します。
name
、Javaクラスの完全修飾名、およびオプションの戻り型が必要です。残念ながら、今それが唯一のSQL文(またはexpr
/selectExpr
付き)で使用することができるためとJava org.apache.spark.sql.api.java.UDF*
が必要です。
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.1.0"
)
package com.example.spark.udfs
import org.apache.spark.sql.api.java.UDF1
class addOne extends UDF1[Integer, Integer] {
def call(x: Integer) = x + 1
}
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
sqlContext.sql("SELECT add_one(1)").show()
## +------+
## |UDF(1)|
## +------+
## | 2|
## +------+
版indpendent:
私はいないだろうそれがサポートされていると言うまでは行っているが、確かに可能である。現在PySparkで利用できるすべてのSQL関数は、単にScala APIのラッパーです。
私はSPARK SQL replacement for mysql GROUP_CONCAT aggregate functionへの答えとして作成しました、それがパッケージcom.example.udaf
に位置していますGroupConcat
UDAFを再利用したいと仮定しましょう:
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row
row = Row("k", "v")
df = sc.parallelize([
row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()
def groupConcat(col):
"""Group and concatenate values for a given column
>>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
>>> df.select(groupConcat("v").alias("vs"))
[Row(vs=u'foo,bar')]
"""
sc = SparkContext._active_spark_context
# It is possible to use java_import to avoid full package path
_groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
# Converting to Seq to match apply(exprs: Column*)
return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))
df.groupBy("k").agg(groupConcat("v").alias("vs")).show()
## +---+---------+
## | k| vs|
## +---+---------+
## | 1|foo1,foo2|
## | 2|bar1,bar2|
## +---+---------+
私の好みのためにしかし、あなたのようにあまりにも多くの先頭のアンダースコアがありますそれができることがわかります。関連
:
は、私は次のことをやって、私は "py4j.protocol.Py4JError" に遭遇するたびにしています:com.exampleを。 udf.GroupConcat.applyがJVMに存在しません。 私のパッケージは "com.example.udf" – StarLord
@ArnabSharmaこれは通常、間違ったCLASSPATHを意味します – zero323
私はenum定数とUDFを持つjarを持っています。どのようにこのコードを使用して変更するには? – dksahuji