2016-08-02 7 views
0

にフィールドの重複レコードを検索し、 "名前"、2016年、 "国" 11、 "NAME1"、2016年、 "COUNTRY1" 10、 "名前"、2016年、 "国" 10は、 "名前"、2016年には、 "国" 12は、 "Name2は"、2017年には、 "COUNTRY2"スパークは、私は、データが 10のように設定しているRDD

私の問題文は私が年によってカウントの合計数と重複を見つけなければならないです。私の結果は(年、総記録、重複)になるはずです 2016,4,3 2017,1,0。

私はそれが10ギガバイトのデータ件まで罰金を実行している

val records = rdd.map { 
       x => 
       val array = x.split(",") 
       (array(2),x) 
      }.groupByKey() 
val duplicates = records.map { 
       x => val totalcount = x._2.size 
         val duplicates = // find duplicates in iterator 
        (x._1,totalcount,duplicates) 
       } 

ことによって、この問題を解決しようとしてきました。私がそれをもっと多くのデータで実行した場合、それは長い時間がかかります。私はgroupByKeyが最善の方法ではないことに気付きました。

この問題を解決するには、最善の方法を提案してください。

答えて

0

私はあなたの例が示している方法で重複を数えるかなりのSQLの専門家ではありません。しかし、私はこれがデータフレームの使用を開始すると思います。私の理解は、データフレームがまっすぐなRDDよりもはるかに優れたパフォーマンスを発揮できるということです。

scala> import com.databricks.spark.csv._ 
import com.databricks.spark.csv._ 

scala> 

scala> val s = List("""10,"Name",2016,"Country"""", """11,"Name1",2016,"country1"""", """10,"Name",2016,"Country"""", """10,"Name",2016,"Country"""", """12,"Name2",2017,"Country2"""") 
s: List[String] = List(10,"Name",2016,"Country", 11,"Name1",2016,"country1", 10,"Name",2016,"Country", 10,"Name",2016,"Country", 12,"Name2",2017,"Country2") 

scala> val rdd = sc.parallelize(s) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[92] at parallelize at <console>:38 

scala> 

scala> val df = new CsvParser().withDelimiter(',').withInferSchema(true).withParseMode("DROPMALFORMED").csvRdd(sqlContext, rdd) 
df: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> 

scala> df.registerTempTable("test") 

scala> 

scala> val dfCount = sqlContext.sql("select C2, count(*), count(distinct C0,C2,C1,C3) from test group by C2") 
dfCount: org.apache.spark.sql.DataFrame = [C2: int, _c1: bigint, _c2: bigint] 

scala> 

scala> dfCount.show 
+----+---+---+                 
| C2|_c1|_c2| 
+----+---+---+ 
|2016| 4| 2| 
|2017| 1| 1| 
+----+---+---+ 
関連する問題