DataFrame
私は、複数のUDFを動的に作成して、特定の行が一致するかどうかを判断したいと考えています。今私はただ一つの例を試しています。私のテストコードは以下のようになります。SparkでUDFを動的に作成するにはどうすればよいですか?
//create the dataframe
import spark.implicits._
val df = Seq(("t","t"), ("t", "f"), ("f", "t"), ("f", "f")).toDF("n1", "n2")
//create the scala function
def filter(v1: Seq[Any], v2: Seq[String]): Int = {
for (i <- 0 until v1.length) {
if (!v1(i).equals(v2(i))) {
return 0
}
}
return 1
}
//create the udf
import org.apache.spark.sql.functions.udf
val fudf = udf(filter(_: Seq[Any], _: Seq[String]))
//apply the UDF
df.withColumn("filter1", fudf(Seq($"n1"), Seq("t"))).show()
ただし、最後の行を実行すると、次のエラーが発生します。
:30: error: not found: value df df.withColumn("filter1", fudf($"n1", Seq("t"))).show() ^ :30: error: type mismatch; found : Seq[String] required: org.apache.spark.sql.Column df.withColumn("filter1", fudf($"n1", Seq("t"))).show() ^
私が間違っていることについてのアイデアはありますか?注意してください、私はScala v2.11.xとSpark 2.0.xです。
この「動的な」UDF質問/懸案事項を解決できれば、データフレームに追加することができます。いくつかのテストコードでは、それは永遠に(それは完了していない、私はctrl-cを打ち破らなければならなかった)。私は、ループの中で.withColumn
の束をやっているのは、Sparkの悪い考えです。もしそうなら、私に知らせてください。私はこのアプローチを完全に放棄します。
import spark.implicits._
val df = Seq(("t","t"), ("t", "f"), ("f", "t"), ("f", "f")).toDF("n1", "n2")
import org.apache.spark.sql.functions.udf
val fudf = udf((x: String) => if (x.equals("t")) 1 else 0)
var df2 = df
for (i <- 0 until 10000) {
df2 = df2.withColumn("filter"+i, fudf($"n1"))
}
ないあなたはここで達成しようとしているのかわから、あなたの列のエントリは基本的に、私は列の任意の数(例えば、 '$」を取るUDFを作成しようとしている –
文字列ではない配列であります(例えば、 "t"、 "f"、...、 "t")、そしてUDFの内部では、if( "n1"、 "$ n2"、...、$ n10 " 'n1 =" t "、n2 =" f "... n10 =" t "'である。 –
このタイプの "フィルタリング"は簡単に 'df.where(" n1 = 't'とn2 = 'f' ...とn10 = 't')。count 'で直列に行うことができますが、そのアプローチは並列化できませんこのようなフィルタは、Sparkアクションを実行するたびに順次実行する必要があります。 –