2016-09-14 4 views
5

予測のために後で使用するIDとベクトルを使用してDataFrameを準備しています。私は自分のデータフレームにGROUPBYを行い、そして私のGROUPBYに私は新しい列にリストとして列のカップルをマージしています:これは私が私の特徴ベクトルとそのラベルを作成しています方法ですudf関数で十分な入力変数が受け入れられない場合のデータフレームの書き換え

def mergeFunction(...) // with 14 input variables 

val myudffunction(mergeFunction) // Spark doesn't support this 

df.groupBy("id").agg(
    collect_list(df(...)) as ... 
    ... // too many of these (something like 14 of them) 
).withColumn("features_labels", 
    myudffunction(
    col(...) 
    , col(...)) 
.select("id", "feature_labels") 

。これまでのところ私のために働いていますが、これは初めて、このメソッドの特徴ベクトルがSparkのudf関数が受け入れる最大値である10より大きくなっています。

私はこれをどのように修正できるかわかりません。 のudf入力のサイズは大きくなりますが、私は間違って理解していますか? 良い方法がありますか?

答えて

5

ユーザー定義関数は、最大22個のパラメーターに対して定義されています。例えば

val dummy = ((
    x0: Int, x1: Int, x2: Int, x3: Int, x4: Int, x5: Int, x6: Int, x7: Int, 
    x8: Int, x9: Int, x10: Int, x11: Int, x12: Int, x13: Int, x14: Int, 
    x15: Int, x16: Int, x17: Int, x18: Int, x19: Int, x20: Int, x21: Int) => 1) 

を登録することができます:

import org.apache.spark.sql.expressions.UserDefinedFunction 

Seq(1).toDF.select(UserDefinedFunction(dummy, IntegerType, None)(exprs: _*)) 
:また UserDefinedFunctionオブジェクトを作成することができます

val exprs = (0 to 21).map(_ => lit(1)) 
Seq(1).toDF.select(
    callUDF("dummy", exprs: _*).alias("dummy") 
) 

spark.udf.register("dummy", dummy) 

とSQLで使用するか、名前で呼ばを

実際に楽しい時を過す22個の引数を持つctionはあまり有用ではありません。これらを生成するためにScalaリフレクションを使用しない限り、メンテナンスの悪夢があります。

私はコレクション(array,map)またはstructを入力として使用するか、複数のモジュールに分割することを検討します。たとえば:

val aLongArray = array((0 to 256).map(_ => lit(1)): _*) 

val udfWitharray = udf((xs: Seq[Int]) => 1) 

Seq(1).toDF.select(udfWitharray(aLongArray).alias("dummy")) 
4

ただ、ゼロの答えに拡大する、10個の以上のパラメータを持つUDFで動作するように.withColumn()機能を得ることが可能です。 spark.udf.register()に関数を指定してから、udfの代わりに列を追加する引数にexprを使用するだけです。

例えば、このような何か作業をする必要があります:私はあなたが関数を呼び出すために配列を周りに渡すことに頼る必要はありだと思いませんので、

def mergeFunction(...) // with 14 input variables 

spark.udf.register("mergeFunction", mergeFunction) // make available in expressions 

df.groupBy("id").agg(
    collect_list(df(...)) as ... 
    ... // too many of these (something like 14 of them) 
).withColumn("features_labels", 
    expr("mergeFunction(col1, col2, col3, col4, ...)")) //pass in the 14 column names 
.select("id", "feature_labels") 

根本的表現パーサーが10個の以上のパラメータを処理するようです。また、それらのパラメータが異なるデータ型である場合、配列はうまく動作しません。

+0

ありがとうございました。この問題を処理するうまい方法。 –

関連する問題