コメントで既に述べたように、データフレームに参加することは、方法です。
ルックアップを使用できますが、ルックアップテーブルをドライバメモリに収集する必要がある、「分散型」ソリューションはないと思います。
import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map
val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))
val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
val combinedDF = depsDF
.withColumn("empNames",lookup(lookupMap)($"EmpID"))
私の最初に考えたのはUDFにempRdd
を渡し、PairRDD
に定義されたlookup
メソッドを使用していたが、あなたは火花を持つことができないので、これは当然のことながら動作しません。また、このアプローチは、EmpIDをが一意であることを前提としています変換(すなわち、UDF)内のアクション(すなわち、lookup
)。
EDIT:
あなたempDfは、複数の列(例えば名前、年齢)を持っている場合、あなたはこの
val empRdd = empDf.rdd.map{row =>
(row.getInt(0),(row.getString(1),row.getInt(2)))}
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,(String,Int)]) =
udf((empID:Int) => lookupMap.lift(empID))
depsDF
.withColumn("lookup",lookup(lookupMap)($"EmpID"))
.withColumn("empName",$"lookup._1")
.withColumn("empAge",$"lookup._2")
.drop($"lookup")
.show()
を使用することができますあなたが必要なものは、二つのデータフレームを「参加」されています... 1が非常にある場合小さい場合は、ブロードキャスト結合を使用します。 –
これまでのコード例はありますか? –
これは、「結合」が意味することです。何か他の方法でこれを実装しようとする理由は何ですか? – maasg