2017-09-15 5 views
-1

私はSparkを初めて使用していますが、Spark Datasetから無効なレコードをフィルタリングしようとしています。 私のデータセットは、このようなものになります。私のロジックでスカラのカスタム関数を使用したSparkデータセット

| Id | Curr| Col3 | 

| 1 | USD | 1111 | 
| 2 | CNY | 2222 | 
| 3 | USD | 3333 | 
| 1 | CNY | 4444 | 

を、各IDはvaild通貨を持っています。だから、基本的にid->currency

val map = Map(1 -> "USD", 2 -> "CNY")

のマップになります私は、Idが有効な通貨コードに対応していないデータセットから行をフィルタします。だから私のフィルタ操作の後に、データセットは、次のようになります。

| Id | Curr| Col3 | 

| 1 | USD | 1111 | 
| 2 | CNY | 2222 | 

私がここに持っている制限は、私がUDFを使用することができないということです。 誰かがこのためのフィルタ操作を考え出すのに手伝ってもらえますか?

答えて

3

あなたがmapのうち、データフレームを作成することができ、その後、インナーはそれをフィルタリングする元のデータフレームと結合します:

val map_df = map.toSeq.toDF("Id", "Curr") 
// map_df: org.apache.spark.sql.DataFrame = [Id: int, Curr: string] 

df.join(map_df, Seq("Id", "Curr")).show 
+---+----+----+ 
| Id|Curr|Col3| 
+---+----+----+ 
| 1| USD|1111| 
| 2| CNY|2222| 
+---+----+----+ 
+0

たぶん私の質問は十分に明確ではありませんでした。データセットに有効なIDがあり、無効な通貨コードの行がある可能性があります。 Like(1、CNY、333)。この場合、そのようなエントリも削除したいと思います。私はこの事件を反映するために質問を更新します。 –

+0

「Id」と通貨が同時に一致しますか? – Psidom

+0

はい、私は基本的に、有効な 'Id'と' Currency'情報を持つ行だけを残したいと思っています。不一致の 'Id'と' Currency'カラムを持つ行は削除する必要があります。 –

0
val a = List((1,"USD",1111),(2,"CAN",2222),(3,"USD",4444),(1,"CAN",5555)) 
val b = Map(1 -> "USD",2 -> "CAN") 
a.filter(x => b.keys.exists(_ == x._1)).filter(y => y._2 == b(y._1)) 
関連する問題