。
あなたはこのライブラリがSpark 1.3+
とところでを必要とApacheのスパークしかしためCSV Data Sourceを使用することができます。このライブラリはSpark 2.x
にインライン展開されています。
しかし、我々は同様のものを分析して実装することができます。我々はcom/databricks/spark/csv/DefaultSource.scala
に見ると
val useHeader = parameters.getOrElse("header", "false")
、我々は最初の行はRDD
に一度だけであると仮定した場合、その後com/databricks/spark/csv/CsvRelation.scalaでそう
// If header is set, make sure firstLine is materialized before sending to executors.
val filterLine = if (useHeader) firstLine else null
baseRDD().mapPartitions { iter =>
// When using header, any input line that equals firstLine is assumed to be header
val csvIter = if (useHeader) {
iter.filter(_ != filterLine)
} else {
iter
}
parseCSV(csvIter, csvFormat)
ありあり(私たちのCSV行)以下の例のようにすることができます:
CSVサンプルファイル:
Latitude,Longitude,Name
48.1,0.25,"First point"
49.2,1.1,"Second point"
47.5,0.75,"Third point"
scala> val csvData = sc.textFile("test.csv")
csvData: org.apache.spark.rdd.RDD[String] = test.csv MapPartitionsRDD[24] at textFile at <console>:24
scala> val header = csvDataRdd.first
header: String = Latitude,Longitude,Name
scala> val csvDataWithoutHeaderRdd = csvDataRdd.mapPartitions{iter => iter.filter(_ != header)}
csvDataWithoutHeaderRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at mapPartitions at <console>:28
scala> csvDataWithoutHeaderRdd.foreach(println)
49.2,1.1,"Second point"
48.1,0.25,"First point"
47.5,0.75,"Third point"
Uが使用されているスパークのバージョン? – VladoDemcak
@VladoDemcak:残念ながら、火花1.0.0。アップグレードするまで、今すぐrddに固執する必要があります。 – user1189851