2017-07-26 9 views
0

を拒否したセーブ:スパークScalaのRDD /データフレームを濾過し、私はこのようなファイルを持っているデータ

id,insert_date,name 
==================== 
1,20170620,abc 
2,20170620,xyz 
1,20170621,pqr 
3,20170624,huy 
,20170624,stu 

は、私がnullのIDを持つレコードをフィルタする必要があります。また、idが繰り返されている場合は、max(insert_date)でレコードを選択する必要があります。

私はこのようにやっている:

val myDF = sqlContext.read.format("com.databricks.spark.csv").option("delimiter",",") 
    .schema(myschema) 
    .load(mypath) 
myDF.registerTempTable("myTable") 
val myFilteredDF=sqlContext.sql("""SELECT id,max(insert_date),name 
       FROM myTable GROUP BY id,name""").filter("length(id) >0" 
myFilteredDF.show() 

は私が望んでいた道を結果得ます。しかし同時に、拒否されたレコードファイルに書き込むために、却下された/フィルタリングされたレコードを別のDataFrame/RDDに取得する必要があります。ここで最高の解決策は何ですか?私は、私は除外する何の反対を行うことができます理解し、それがよりよい解決策のように見えない

+0

、残念ながら私にはありませんhave – user3124284

答えて

1

あなたがexcept試みることができる:見てくれてありがとう@mtoto

val otherDF = myDF.except(myFilteredDF) 
+0

同じレコードがある場合、最大/グループ別に – user3124284

+1

でフィルタリングされた2つの同一のレコードがある場合は、拒否されたレコードをすべて取得しないようにmtotoさんに感謝します。あなたのプロジェクトで冗長性を収集するのが好きでない限り。 – mtoto

関連する問題