2015-10-20 14 views
13

たとえば、私のチームがSparkで開発するための参照言語としてPythonを選択したとしましょう。しかし後でパフォーマンス上の理由から、Pythonコード(ScalaやJavaスケルトンを持つPythonスタブに似たもの)でそれらをマッピングするために、特定のScalaまたはJava固有のlibrairiesを開発したいと考えています。Spark:ScalaやJavaのユーザー定義関数でPythonをマッピングする方法は?

カスタマイズされた新しいPythonメソッドを、ScalaやJava User Defined Functionsの下で使用することは可能ですか?

答えて

18

スパーク2.1+

あなたはSQLContext.registerJavaFunctionを使用することができます:それはSQL文で使用できるように

がJava UDFを登録します。

name、Javaクラスの完全修飾名、およびオプションの戻り型が必要です。残念ながら、今それが唯一のSQL文(またはexpr/selectExpr付き)で使用することができるためとJava org.apache.spark.sql.api.java.UDF*が必要です。

scalaVersion := "2.11.8" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-sql" % "2.1.0" 
) 
package com.example.spark.udfs 

import org.apache.spark.sql.api.java.UDF1 

class addOne extends UDF1[Integer, Integer] { 
    def call(x: Integer) = x + 1 
} 
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne") 
sqlContext.sql("SELECT add_one(1)").show() 

## +------+ 
## |UDF(1)| 
## +------+ 
## |  2| 
## +------+ 

版indpendent

私はいないだろうそれがサポートされていると言うまでは行っているが、確かに可能である。現在PySparkで利用できるすべてのSQL関数は、単にScala APIのラッパーです。

私はSPARK SQL replacement for mysql GROUP_CONCAT aggregate functionへの答えとして作成しました、それがパッケージcom.example.udafに位置していますGroupConcat UDAFを再利用したいと仮定しましょう:

from pyspark.sql.column import Column, _to_java_column, _to_seq 
from pyspark.sql import Row 

row = Row("k", "v") 
df = sc.parallelize([ 
    row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF() 

def groupConcat(col): 
    """Group and concatenate values for a given column 

    >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) 
    >>> df.select(groupConcat("v").alias("vs")) 
    [Row(vs=u'foo,bar')] 
    """ 
    sc = SparkContext._active_spark_context 
    # It is possible to use java_import to avoid full package path 
    _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply 
    # Converting to Seq to match apply(exprs: Column*) 
    return Column(_groupConcat(_to_seq(sc, [col], _to_java_column))) 

df.groupBy("k").agg(groupConcat("v").alias("vs")).show() 

## +---+---------+ 
## | k|  vs| 
## +---+---------+ 
## | 1|foo1,foo2| 
## | 2|bar1,bar2| 
## +---+---------+ 

私の好みのためにしかし、あなたのようにあまりにも多くの先頭のアンダースコアがありますそれができることがわかります。関連

+1

は、私は次のことをやって、私は "py4j.protocol.Py4JError" に遭遇するたびにしています:com.exampleを。 udf.GroupConcat.applyがJVMに存在しません。 私のパッケージは "com.example.udf" – StarLord

+2

@ArnabSharmaこれは通常、間違ったCLASSPATHを意味します – zero323

+0

私はenum定数とUDFを持つjarを持っています。どのようにこのコードを使用して変更するには? – dksahuji

関連する問題