// input rdds
val rdd1 = spark.sparkContext.makeRDD(Seq((1,2,3,11), (2,1,3,12), (1,4,5,13), (3,5,6,12)))
val rdd2 = spark.sparkContext.makeRDD(Seq((1,2), (2,1)))
// manipulate the 2 rdds as a key, val pair
// the key of the first rdd is a tuple pair of first two fields, the val contains all the fields
// the key of the second rdd is a tuple of first two fields, the val is just null
// then we could perform joins on their key
val rdd1_key = rdd1.map(record => ((record._1, record._2), record))
val rdd2_key = rdd2.map(record => (record, null))
// 1. perform left outer join, the record become (key, (val1, val2))
// 2. filter, keep those records which do not have a join
// if there is no join, val2 will be None, otherwise val2 will be null, which is the value we hardcoded from previous step
// 3. get val1
rdd1_key.leftOuterJoin(rdd2_key)
.filter(record => record._2._2 == None)
.map(record => record._2._1)
.collect().foreach(println(_))
// result
(1,4,5,13)
(3,5,6,12)
おかげ