2016-10-13 6 views
3

私はハイブテーブルからsparkデータフレームにデータをロードしていて、今では1つのデータフレームにすべてのユニークなacctを、別のデータフレームにすべての重複を必要とする問題に取り組んでいます。例えば私がacct id 1,1,2,3,4を持っているならば。 1つのデータフレームに2,3,4、もう1つに1,1を取得したいと考えています。これどうやってするの?Spark Dataframeでは、2つのデータフレームで重複したレコードと別個のレコードを取得する方法はありますか?

+1

ロジック:IDによる集計& 'df_unique = DF id_count = 1 '、' df_dup = DF id_count> 1' – David

+0

こんにちはデビッド、元のデータフレームの上にカウントに参加し、カウント..私はくぼみそれを得ます私はdf.agg( "acctid")を取得する必要がありますか?count()? –

答えて

5
val acctDF = List(("1", "Acc1"), ("1", "Acc1"), ("1", "Acc1"), ("2", "Acc2"), ("2", "Acc2"), ("3", "Acc3")).toDF("AcctId", "Details") 
scala> acctDF.show() 
+------+-------+ 
|AcctId|Details| 
+------+-------+ 
|  1| Acc1| 
|  1| Acc1| 
|  1| Acc1| 
|  2| Acc2| 
|  2| Acc2| 
|  3| Acc3| 
+------+-------+ 

val countsDF = acctDF.map(rec => (rec(0), 1)).reduceByKey(_+_).map(rec=> (rec._1.toString, rec._2)).toDF("AcctId", "AcctCount") 

val accJoinedDF = acctDF.join(countsDF, acctDF("AcctId")===countsDF("AcctId"), "left_outer").select(acctDF("AcctId"), acctDF("Details"), countsDF("AcctCount")) 

scala> accJoinedDF.show() 
+------+-------+---------+ 
|AcctId|Details|AcctCount| 
+------+-------+---------+ 
|  1| Acc1|  3| 
|  1| Acc1|  3| 
|  1| Acc1|  3| 
|  2| Acc2|  2| 
|  2| Acc2|  2| 
|  3| Acc3|  1| 
+------+-------+---------+ 


val distAcctDF = accJoinedDF.filter($"AcctCount"===1) 
scala> distAcctDF.show() 
+------+-------+---------+ 
|AcctId|Details|AcctCount| 
+------+-------+---------+ 
|  3| Acc3|  1| 
+------+-------+---------+ 

val duplAcctDF = accJoinedDF.filter($"AcctCount">1) 
scala> duplAcctDF.show() 
+------+-------+---------+     
|AcctId|Details|AcctCount| 
+------+-------+---------+ 
|  1| Acc1|  3| 
|  1| Acc1|  3| 
|  1| Acc1|  3| 
|  2| Acc2|  2| 
|  2| Acc2|  2| 
+------+-------+---------+ 

(OR scala> duplAcctDF.distinct.show()) 
+0

私が間違っているが、データフレーム上でmapまたはreduceByKeyを操作できない場合は、irを最初にrddに変換し、最後にDFに戻す必要があります。このようなもの: 'val countsDF = acctDF.rdd.map .... toDF()' – Emiliano

1

あなたが持っているスパークのバージョンによっては、/ SQL以下のようなデータセットにウィンドウ関数を使用することができます。

Dataset<Row> New = df.withColumn("Duplicate", count("*").over(Window.partitionBy("id"))); 

Dataset<Row> Dups = New.filter(col("Duplicate").gt(1)); 

Dataset<Row> Uniques = New.filter(col("Duplicate").equalTo(1)); 

上記Javaで書かれています。スカラーで似ていて、Pythonでやる方法を読んでください。 https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

関連する問題