私はハイブテーブルからsparkデータフレームにデータをロードしていて、今では1つのデータフレームにすべてのユニークなacctを、別のデータフレームにすべての重複を必要とする問題に取り組んでいます。例えば私がacct id 1,1,2,3,4を持っているならば。 1つのデータフレームに2,3,4、もう1つに1,1を取得したいと考えています。これどうやってするの?Spark Dataframeでは、2つのデータフレームで重複したレコードと別個のレコードを取得する方法はありますか?
3
A
答えて
5
val acctDF = List(("1", "Acc1"), ("1", "Acc1"), ("1", "Acc1"), ("2", "Acc2"), ("2", "Acc2"), ("3", "Acc3")).toDF("AcctId", "Details")
scala> acctDF.show()
+------+-------+
|AcctId|Details|
+------+-------+
| 1| Acc1|
| 1| Acc1|
| 1| Acc1|
| 2| Acc2|
| 2| Acc2|
| 3| Acc3|
+------+-------+
val countsDF = acctDF.map(rec => (rec(0), 1)).reduceByKey(_+_).map(rec=> (rec._1.toString, rec._2)).toDF("AcctId", "AcctCount")
val accJoinedDF = acctDF.join(countsDF, acctDF("AcctId")===countsDF("AcctId"), "left_outer").select(acctDF("AcctId"), acctDF("Details"), countsDF("AcctCount"))
scala> accJoinedDF.show()
+------+-------+---------+
|AcctId|Details|AcctCount|
+------+-------+---------+
| 1| Acc1| 3|
| 1| Acc1| 3|
| 1| Acc1| 3|
| 2| Acc2| 2|
| 2| Acc2| 2|
| 3| Acc3| 1|
+------+-------+---------+
val distAcctDF = accJoinedDF.filter($"AcctCount"===1)
scala> distAcctDF.show()
+------+-------+---------+
|AcctId|Details|AcctCount|
+------+-------+---------+
| 3| Acc3| 1|
+------+-------+---------+
val duplAcctDF = accJoinedDF.filter($"AcctCount">1)
scala> duplAcctDF.show()
+------+-------+---------+
|AcctId|Details|AcctCount|
+------+-------+---------+
| 1| Acc1| 3|
| 1| Acc1| 3|
| 1| Acc1| 3|
| 2| Acc2| 2|
| 2| Acc2| 2|
+------+-------+---------+
(OR scala> duplAcctDF.distinct.show())
+0
私が間違っているが、データフレーム上でmapまたはreduceByKeyを操作できない場合は、irを最初にrddに変換し、最後にDFに戻す必要があります。このようなもの: 'val countsDF = acctDF.rdd.map .... toDF()' – Emiliano
1
あなたが持っているスパークのバージョンによっては、/ SQL以下のようなデータセットにウィンドウ関数を使用することができます。
Dataset<Row> New = df.withColumn("Duplicate", count("*").over(Window.partitionBy("id")));
Dataset<Row> Dups = New.filter(col("Duplicate").gt(1));
Dataset<Row> Uniques = New.filter(col("Duplicate").equalTo(1));
上記Javaで書かれています。スカラーで似ていて、Pythonでやる方法を読んでください。 https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
関連する問題
- 1. Spark Dataframeで複数のキーを持つ重複レコードをフィルタリングする方法は?
- 2. クイックビューアで別個のレコードを取得する方法は?
- 3. spark dataframeでレコードの入力ファイル名を取得する方法は?
- 4. SQLで重複したレコードを区別する方法は?
- 5. ファクトリーガールの2つの方法でレコードを取得する方法はありますか?
- 6. 1つのSQLを使用して複数のレコードまたは複数のSQLを取得して個別のレコードを取得する
- 7. パンダのデータフレームの重複除外と最新のレコードの取得
- 8. Yiiデータプロバイダからn個のレコードを取得する方法は?
- 9. 結果から重複レコードをクリアする方法はありますか?
- 10. mongodbで照会して個数別レコードを取得する方法
- 11. 2つの日時の間にデータベースからレコードを取得する方法はありますか?
- 12. grailsの最後の2つのレコードを効率的に取得する方法はありますか?
- 13. 2つのテーブルから別のレコードを取得するSQLクエリ
- 14. Python:2つのCSVファイル間で古いレコードと重複を取り除く
- 15. COL1とCOL2でソートされた別個のレコードを取得できません
- 16. 別個のレコードを持つIQueryableと注文しました
- 17. レコードはmysqlに重複したIDで取得されます(条件内)?
- 18. 別個のレコード
- 19. djangoクエリーセットで次のn個のレコードを取得するには?
- 20. PHPエコーjsonは2つのカテゴリと複数のレコードしかありません
- 21. WSO2 DSSを使用してmongodbから2番目のレコードを取得する方法はありますか?
- 22. ランダムなレコードと特定のレコードを1つのクエリで取得しますか?
- 23. Sparkで2つ(またはそれ以上)のDataFrameを圧縮する方法
- 24. db.run_in_transactionレコードで作成したキーを取得する方法は?
- 25. sqliteからレコードを1つずつ取得する方法は?
- 26. marklogicで「N」個のレコードを取得
- 27. SQL:このクエリで重複レコードを取得する
- 28. このシナリオで結合後にレコードが重複しないようにする方法はありますか?
- 29. テーブルを照会してレコードのシーケンスまたはチェーンを取得する方法はありますか?
- 30. 複数の複製のレコードを1つ取得するには
ロジック:IDによる集計& 'df_unique = DF id_count = 1 '、' df_dup = DF id_count> 1' – David
こんにちはデビッド、元のデータフレームの上にカウントに参加し、カウント..私はくぼみそれを得ます私はdf.agg( "acctid")を取得する必要がありますか?count()? –