2017-03-13 10 views
2

I有し、以下の構造を有するCSV:フィルタRDD線スカラ

ヘッダ、ヘッダ、ヘッダ、ヘッダを、ヘッダ
VAL1、VAL2、val3は、val4、val5
VAL1 、VAL2、ヌル、val4、val5
VAL1、VAL2、val3は、ヌル、val5

私は何をする必要がどのようなヘッダと特定の位置にヌル値を含むデータの行をフィルタである(それはOKをクリックするとval3ではnullが返されますが、val4ではnullになりません)。私はrddを作成し、コンマで行を分割し、私の希望は配列のインデックス位置のように各行にアクセスすることでした。しかし、私は比較を行う方法を見つけることができません。私はフィールドを抽出することができます

rdd.map(値=>(値は(2))

がどのように比較を行うのです。特に「が含まれていない」私はそこだと思いますか?。 !?比較方法が利用可能であるか、この問題が含まれているタプルを必要としない

+1

RDDを使用する必要があるという制約はありますか?私が思っていない場合は、DataFrameを使用することができます。 SPARKのDataFrame APIは、CSVファイルの操作を処理するのに最適です。 – Tawkir

答えて

3

あなたはそれらの値をラップするためのタイプを定義したと仮定すると、のは言わせて:

case class Record(val1: String, val2: Option[String], val3: String, val4: Option[String]) 

val rdd: RDD[Record] = ... 
rdd.filter(record => record.val2.isDefined && record.val4.isDefined) 

私はこれが役に立ちそうです。

2

RDDの代わりにDataFrameを使用している場合は、filterとブール値Columnの操作を使用します。

val5もnullでないとします。あなたのcsvファイルは次のようになります場合は

は:

[email protected] ~ > cat dat_2.csv 
header1,header2,header3,header4,header5 
val1,val2,val3,val4,val5 
val1,val2,null,val4,val5 
val1,val2,val3,null,val5 

次に、あなたのコード:あなたのデータは次のようになります。それ以外の場合

scala> val dat_1 = spark.read.option("header", true).csv("dat_1.csv") 
dat_1: org.apache.spark.sql.DataFrame = [header1: string, header2: string ... 3 more fields] 

scala> dat_1.show 
+-------+-------+-------+-------+-------+ 
|header1|header2|header3|header4|header5| 
+-------+-------+-------+-------+-------+ 
| val1| val2| val3| val4| val5| 
| val1| val2| null| val4| val5| 
| val1| val2| val3| null| val5| 
+-------+-------+-------+-------+-------+ 


scala> data1.filter($"header4".isNotNull && $"header5".isNotNull).show 
+-------+-------+-------+-------+-------+ 
|header1|header2|header3|header4|header5| 
+-------+-------+-------+-------+-------+ 
| val1| val2| val3| val4| val5| 
| val1| val2| null| val4| val5| 
+-------+-------+-------+-------+-------+ 

[email protected] ~ > cat dat_1.csv 
header1,header2,header3,header4,header5 
val1,val2,val3,val4,val5 
val1,val2,,val4,val5 
val1,val2,val3,,val5 

次に、あなたのコードは次のようになります。次のようになります:

scala> val dat_2 = spark.read.option("header", true).csv("dat_2.csv") 
dat_2: org.apache.spark.sql.DataFrame = [header1: string, header2: string ... 3 more fields] 

scala> dat_2.show 
+-------+-------+-------+-------+-------+ 
|header1|header2|header3|header4|header5| 
+-------+-------+-------+-------+-------+ 
| val1| val2| val3| val4| val5| 
| val1| val2| null| val4| val5| 
| val1| val2| val3| null| val5| 
+-------+-------+-------+-------+-------+ 


scala> dat_2.filter($"header4" =!= "null" && $"header5" =!= "null").show 
+-------+-------+-------+-------+-------+ 
|header1|header2|header3|header4|header5| 
+-------+-------+-------+-------+-------+ 
| val1| val2| val3| val4| val5| 
| val1| val2| null| val4| val5| 
+-------+-------+-------+-------+-------+ 
関連する問題