2016-10-17 4 views
0

パスからrddに複数のcsvを読み込もうとしています。このパスには多くのcsvがあります。すべてのcsvをrddに読み込みながらヘッダを避ける方法はありますか?フィルタを使用したり、個々のcsvを個別に処理したりしなくても、spotsRDDを使用してヘッダを省略することができます。あなたが火花1.0.0を使用している残念です複数のファイルをスカラーのrddに読み込みながらcsvのヘッダーをスキップ

val path ="file:///home/work/csvs/*" 
    val spotsRDD= sc.textFile(path) 
    println(spotsRDD.count()) 

おかげ

+0

Uが使用されているスパークのバージョン? – VladoDemcak

+0

@VladoDemcak:残念ながら、火花1.0.0。アップグレードするまで、今すぐrddに固執する必要があります。 – user1189851

答えて

1

あなたはこのライブラリSpark 1.3+とところでを必要とApacheのスパークしかしためCSV Data Sourceを使用することができます。このライブラリはSpark 2.xにインライン展開されています。

しかし、我々は同様のものを分析して実装することができます。我々はcom/databricks/spark/csv/DefaultSource.scalaに見ると

val useHeader = parameters.getOrElse("header", "false") 

、我々は最初の行はRDDに一度だけであると仮定した場合、その後com/databricks/spark/csv/CsvRelation.scalaでそう

// If header is set, make sure firstLine is materialized before sending to executors. 
val filterLine = if (useHeader) firstLine else null 

baseRDD().mapPartitions { iter => 
// When using header, any input line that equals firstLine is assumed to be header 
val csvIter = if (useHeader) { 
    iter.filter(_ != filterLine) 
} else { 
    iter 
} 
parseCSV(csvIter, csvFormat) 

ありあり(私たちのCSV行)以下の例のようにすることができます:

CSVサンプルファイル:

Latitude,Longitude,Name 
48.1,0.25,"First point" 
49.2,1.1,"Second point" 
47.5,0.75,"Third point" 

scala> val csvData = sc.textFile("test.csv") 
csvData: org.apache.spark.rdd.RDD[String] = test.csv MapPartitionsRDD[24] at textFile at <console>:24 

scala> val header = csvDataRdd.first 
header: String = Latitude,Longitude,Name 

scala> val csvDataWithoutHeaderRdd = csvDataRdd.mapPartitions{iter => iter.filter(_ != header)} 
csvDataWithoutHeaderRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at mapPartitions at <console>:28 

scala> csvDataWithoutHeaderRdd.foreach(println) 
49.2,1.1,"Second point" 
48.1,0.25,"First point" 
47.5,0.75,"Third point" 
+0

複数のcsvファイルを1つのrddで読み込んだ場合、これは動作しません.. ?? – RockSolid

関連する問題