のUDFは可変引数をサポートしていません
おかげ*がありますが、包まれた列の任意の数を渡すことができます
import org.apache.spark.sql.functions.{udf, array, lit}
val myConcatFunc = (xs: Seq[Any], sep: String) =>
xs.filter(_ != null).mkString(sep)
val myConcat = udf(myConcatFunc)
使用例:
関数を使用して生のSQLで
val df = sc.parallelize(Seq(
(null, "a", "b", "c"), ("d", null, null, "e")
)).toDF("x1", "x2", "x3", "x4")
val cols = array($"x1", $"x2", $"x3", $"x4")
val sep = lit("-")
df.select(myConcat(cols, sep).alias("concatenated")).show
// +------------+
// |concatenated|
// +------------+
// | a-b-c|
// | d-e|
// +------------+
:
df.registerTempTable("df")
sqlContext.udf.register("myConcat", myConcatFunc)
sqlContext.sql(
"SELECT myConcat(array(x1, x2, x4), '.') AS concatenated FROM df"
).show
// +------------+
// |concatenated|
// +------------+
// | a.c|
// | d.e|
// +------------+
Aもう少し複雑なアプローチは、すべてのUDFを使用して、おおよそ次のようなものでSQL式を構成されていません。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
def myConcatExpr(sep: String, cols: Column*) = regexp_replace(concat(
cols.foldLeft(lit(""))(
(acc, c) => when(c.isNotNull, concat(acc, c, lit(sep))).otherwise(acc)
)
), s"($sep)?$$", "")
df.select(
myConcatExpr("-", $"x1", $"x2", $"x3", $"x4").alias("concatenated")
).show
// +------------+
// |concatenated|
// +------------+
// | a-b-c|
// | d-e|
// +------------+
が、私はそれは価値がある疑問あなたがPySparkで作業しない限り、努力します。あなたは可変引数を使用して関数を渡す場合
*それは、すべてのシンタックスシュガーから剥ぎ取り、UDFはArrayType
を期待したことになります。たとえば:
def f(s: String*) = s.mkString
udf(f _)
は型になります。
UserDefinedFunction(<function1>,StringType,List(ArrayType(StringType,true)))
こんにちは、連結中にリテラルとして明示的にカラム名を渡さない限り、ありません... – Kalpesh
をカラム名を取得する方法はあります。 – zero323
こんにちは、ありがとう、同じの構文を共有してください。 – Kalpesh