2016-12-01 13 views
0

INPUT-スパークSQLクエリ

col_a col_b 
A  B 
D  B 
B  E 
C  A 

私はsparksqlを使用する方法を次のように出力を取得しようとしていますが、私は使用して出力を希望取得することができませんNOT EXITS /左外部結合。次の出力を得るために私を助けてください。

col_a col_b 
A   B 
D   B 
C   A 

col_bの値がcol_aにある場合、両方のテーブルから値を取得したいと考えています。あなたの列が大きすぎるではありません、私はこのような何かをするだろうと仮定すると

答えて

0

scala> val df = Seq(("A", "B"), ("D", "B"), ("B", "E"), ("C", "A")).toDF("col_a", "col_b") 
df: org.apache.spark.sql.DataFrame = [col_a: string, col_b: string] 

scala> df.show 
+-----+-----+ 
|col_a|col_b| 
+-----+-----+ 
| A| B| 
| D| B| 
| B| E| 
| C| A| 
+-----+-----+ 

scala> import org.apache.spark.sql.Row 
import org.apache.spark.sql.Row 

scala> import scala.collection.mutable.HashSet 
import scala.collection.mutable.HashSet 

scala> val col_a_vals = df.rdd.map{case Row(a: String, b: String) => a}.collect.toSeq 
col_a_vals: Seq[String] = WrappedArray(A, D, B, C) 

scala> val col_a_set = HashSet(col_a_vals :_*) 
col_a_set: scala.collection.mutable.HashSet[String] = Set(B, C, D, A) 

scala> val broad_set = sc.broadcast(col_a_set) 
broad_set: org.apache.spark.broadcast.Broadcast[scala.collection.mutable.HashSet[String]] = Broadcast(56) 

scala> val contains_col_a = udf((value: String) => broad_set.value.contains(value)) 
contains_col_a: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,List(StringType)) 

scala> df.filter(contains_col_a($"col_b")).show 
+-----+-----+ 
|col_a|col_b| 
+-----+-----+ 
| A| B| 
| D| B| 
| C| A| 
+-----+-----+ 
+0

私のデータセットが巨大です。そして、私はちょうど私の質問を提示するための例を掲示しました。 – Rushabh

関連する問題