spark DataFrameをcsvとしてディスクに保存する方法はありますか?
答えて
Apache Sparkはディスク上のネイティブCSV出力をサポートしていません。
あなたはしかし、4つの利用可能な解決策を持っている:
あなたはあなたのデータフレームは、RDDに変換することができます
def convertToReadableString(r : Row) = ??? df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
これは、フォルダのファイルパスを作成します。ファイルパスの下には、パーティションのファイル(例えば一部-000 *)私は大きなCSVにすべてのパーティションを追加したい場合は、私は通常、何
が
cat filePath/part* > mycsvfile.csv
一部を使用するあるを見つけることができます
coalesce(1,false)
RDDから1つのパーティションを作成します。それはあなたが収集しているすべてのデータをプルすることによってドライバを圧倒する可能性があるので、通常悪い練習です。df.rdd
はRDD[Row]
を返します。あなたはDatabricks火花CSV library使用することができます。
スパーク1.4+:
df.write.format("com.databricks.spark.csv").save(filepath)
スパーク1.3:
df.save(filepath,"com.databricks.spark.csv")
ウィットをh Spark 2.x
spark-csv
パッケージはSparkに含まれているので不要です。df.write.format("csv").save(filepath)
あなたは地元のパンダのデータフレームに変換し、
to_csv
方法を使用することができます(PySparkのみ)。
注:ソリューション1、2及び3は、あなたがsave
を呼び出す際の呼び出しをスパーク基礎となるのHadoop APIによって生成されたCSV形式ファイル(part-*
)になります。 1つのパーティションに1つのpart-
ファイルがあります。
同様の問題がありました。私は、クライアントモードでクラスタに接続している間、ドライバにcsvファイルを書き留める必要がありました。
潜在的なエラーを避けるために、Apache Sparkと同じCSV解析コードを再利用したいと思っていました。
私はspark-csvコードをチェックして、データフレームをraw csv RDD[String]
に変換するコードがcom.databricks.spark.csv.CsvSchemaRDD
であることを発見しました。
悲しいことに、悲しいことに、sc.textFile
と関連するメソッドの最後でハードコードされています。
そのコードをコピーして最後の行をsc.textFile
で削除し、代わりにRDDを直接返しました。
マイコード:
/*
This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
But in last lines of that method it's hardcoded against writing as text file -
for our case we need RDD.
*/
object DataframeToRawCsvRDD {
val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat
def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
(implicit ctx: ExecutionContext): RDD[String] = {
val delimiter = parameters.getOrElse("delimiter", ",")
val delimiterChar = if (delimiter.length == 1) {
delimiter.charAt(0)
} else {
throw new Exception("Delimiter cannot be more than one character.")
}
val escape = parameters.getOrElse("escape", null)
val escapeChar: Character = if (escape == null) {
null
} else if (escape.length == 1) {
escape.charAt(0)
} else {
throw new Exception("Escape character cannot be more than one character.")
}
val quote = parameters.getOrElse("quote", "\"")
val quoteChar: Character = if (quote == null) {
null
} else if (quote.length == 1) {
quote.charAt(0)
} else {
throw new Exception("Quotation cannot be more than one character.")
}
val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
val quoteMode: QuoteMode = if (quoteModeString == null) {
null
} else {
QuoteMode.valueOf(quoteModeString.toUpperCase)
}
val nullValue = parameters.getOrElse("nullValue", "null")
val csvFormat = defaultCsvFormat
.withDelimiter(delimiterChar)
.withQuote(quoteChar)
.withEscape(escapeChar)
.withQuoteMode(quoteMode)
.withSkipHeaderRecord(false)
.withNullString(nullValue)
val generateHeader = parameters.getOrElse("header", "false").toBoolean
val headerRdd = if (generateHeader) {
ctx.sparkContext.parallelize(Seq(
csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
))
} else {
ctx.sparkContext.emptyRDD[String]
}
val rowsRdd = dataFrame.rdd.map(row => {
csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
})
headerRdd union rowsRdd
}
}
私は定義された名前のCSVファイルにデータフレームの内容を保存していた同様の問題がありました。 df.write("csv").save("<my-path>")
がファイル以外のディレクトリを作成していました。だから以下の解決策を考え出す必要があります。 ほとんどのコードは、ロジックにほとんど変更を加えずに、次のdataframe-to-csvから取得しています。
def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
val tmpParquetDir = "Posts.tmp.parquet"
df.repartition(1).write.
format("com.databricks.spark.csv").
option("header", header.toString).
option("delimiter", sep).
save(tmpParquetDir)
val dir = new File(tmpParquetDir)
val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
(new File(tmpTsvFile)).renameTo(new File(tsvOutput))
dir.listFiles.foreach(f => f.delete)
dir.delete
}
btwこれは配列を返しませんが、DataFrame! [参考資料](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – eliasah
もし答えがあなたの質問を解決するなら、それを受け入れてください私たちは解決されたようにこの問題を分類することができます! – eliasah