2017-10-19 4 views
0

私は非常にスケーラとスパークには新しく、始める方法がわかりません。Spark Scala別のRDDの列に基づいて1つのRDDの行を削除します

私はこのようになり、1つのRDDを持っている:

1,2,3,11 
2,1,4,12 
1,4,5,13 
3,5,6,12 

次のようになり、その別:

2,1 
1,2 

私はそれが一致している任意の行を削除するような最初のRDDをフィルタリングします第2のRDDの最初の2つの列。彼らはrddの、より作り付けの機能と従来のデータベースに類似してフォームを最適化しているように私は個人的にdataframe/dataset方法を好む

1,4,5,13 
3,5,6,12 

答えて

1
// input rdds 
val rdd1 = spark.sparkContext.makeRDD(Seq((1,2,3,11), (2,1,3,12), (1,4,5,13), (3,5,6,12))) 
val rdd2 = spark.sparkContext.makeRDD(Seq((1,2), (2,1))) 

// manipulate the 2 rdds as a key, val pair 
// the key of the first rdd is a tuple pair of first two fields, the val contains all the fields 
// the key of the second rdd is a tuple of first two fields, the val is just null 
// then we could perform joins on their key 
val rdd1_key = rdd1.map(record => ((record._1, record._2), record)) 
val rdd2_key = rdd2.map(record => (record, null)) 

// 1. perform left outer join, the record become (key, (val1, val2)) 
// 2. filter, keep those records which do not have a join 
// if there is no join, val2 will be None, otherwise val2 will be null, which is the value we hardcoded from previous step 
// 3. get val1 
rdd1_key.leftOuterJoin(rdd2_key) 
    .filter(record => record._2._2 == None) 
    .map(record => record._2._1) 
    .collect().foreach(println(_)) 

// result 
(1,4,5,13) 
(3,5,6,12) 

おかげ

1

:ような出力が見えるはずです。

dataframe方法です以下:

最初のステップは、第2のステップは

をチェックするフィルタリング条件のため dataframe2で新しい columnを追加することです rdds

dataframesから
import sqlContext.implicits._ 
val df1 = rdd1.toDF("col1", "col2", "col3", "col4") 
val df2 = rdd2.toDF("col1", "col2") 

の両方を変換することです

import org.apache.spark.sql.functions._ 
val tempdf2 = df2.withColumn("check", lit("check")) 

最後のステップはです2つは、rowscolumnsの2つを含む。

val finalDF = df1.join(tempdf2, Seq("col1", "col2"), "left") 
          .filter($"check".isNull) 
          .drop($"check") 

あなたは今、あなたはどちらかfinalDF.rddを使用してrddに変換することができ、最終的なdataframe

+----+----+----+----+ 
|col1|col2|col3|col4| 
+----+----+----+----+ 
|3 |5 |6 |12 | 
|1 |4 |5 |13 | 
+----+----+----+----+ 

として持つべきか、 dataframe自身とあなたの更なる処理を継続することができます。

回答が役に立ちそうです。

関連する問題