2017-09-22 9 views
1

私は2つのケースクラスと1つのRDDを持っています。別のRDDのIDのメンバーシップに基づく別個のRDD

私は2つの新しいRDD [Thing1] Sを作成したい
case class Thing1(Id: String, a: String, b: String, c: java.util.Date, d: Double) 
case class Thing2(Id: String, e: java.util.Date, f: Double) 

val rdd1 = // Loads an rdd of type RDD[Thing1] 
val rdd2 = // Loads an rdd of type RDD[Thing2] 

、1要素はRDD2内のIDが存在していRDD1の要素を含み、RDD1の要素が含まれている別の要素がIDを持っていない場合RDD2

に存在するがここで(Scala Spark contains vs. does not contain、これを見て、他のスタックオーバーフローの記事が、どれが働いていない)私が試してみました何

val rdd2_ids = rdd2.map(r => r.Id) 
val rdd1_present = rdd1.filter{case r => rdd2 contains r.Id} 

val rdd1_absent = rdd1.filter{case r => !(rdd2 contains r.Id)} 

しかし、これは私が多くを見てきましたエラーerror: value contains is not a member of org.apache.spark.rdd.RDD[String] を取得します私がやろうとしていることと似たようなことをする方法を尋ねる質問はありますが、誰も私のために働いていません。私は多くのエラーvalue _____ is not a member of org.apache.spark.rdd.RDD[String]を取得します。

なぜ私のためにこれらの答えがうまくいかず、私は何をしようとしていますか?

+0

なぜRDDの代わりにデータフレームを使用できないのですか?データフレームは、2つのデータフレーム間の差異を検出する機能を除いて提供します。 –

答えて

0

私は今、あなたはあなたが両方に共通の値を検索するそれぞれの要素によってそれらを結合することができ、2つの単純なRDDS

val rdd1 = sc.parallelize(Array(
    | Thing1(1,2), 
    | Thing1(2,3), 
    | Thing1(3,4))) 
rdd1: org.apache.spark.rdd.RDD[Thing1] = ParallelCollectionRDD[174] at parallelize 

val rdd2 = sc.parallelize(Array(
    | Thing2(1, "Two"), 
    | Thing2(2, "Three"))) 
rdd2: org.apache.spark.rdd.RDD[Thing2] = ParallelCollectionRDD[175] at parallelize 

を作成しました:

val rdd1_present = rdd1.keyBy(_.a).join(rdd2.keyBy(_.a)).map{ case(a, (t1, t2)) => t1 } 

//rdd1_present.collect 
//Array[Thing1] = Array(Thing1(2,3), Thing1(1,2)) 

val rdd1_absent = rdd1.keyBy(_.a).subtractByKey(rdd1_present.keyBy(_.a)).map{ case(a,t1) => t1 } 

//rdd1_absent.collect 
//Array[Thing1] = Array(Thing1(3,4)) 
0

完全外部join-はしてみてください
val joined = rdd1.map(s=>(s.id,s)).fullOuterJoin(rdd2.map(s=>(s.id,s))).cache() 

//only in left 
joined.filter(s=> s._2._2.isEmpty).foreach(println) 

//only in right 
joined.filter(s=>s._2._1.isEmpty).foreach(println) 

//in both 
joined.filter(s=> !s._2._1.isEmpty && !s._2._2.isEmpty).foreach(println) 
関連する問題