2015-10-16 5 views
7

Spark DataFrameをディスク上にcsvとして保存するには?アレイを返すこの</p> <pre><code>df.filter("project = 'en'").select("title","count").groupBy("title").sum() </code></pre> <p>の実施結果に対する

spark DataFrameをcsvとしてディスクに保存する方法はありますか?

+1

btwこれは配列を返しませんが、DataFrame! [参考資料](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – eliasah

+0

もし答えがあなたの質問を解決するなら、それを受け入れてください私たちは解決されたようにこの問題を分類することができます! – eliasah

答えて

13

Apache Sparkはディスク上のネイティブCSV出力をサポートしていません。

あなたはしかし、4つの利用可能な解決策を持っている:

  1. あなたはあなたのデータフレームは、RDDに変換することができます

    def convertToReadableString(r : Row) = ??? 
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath) 
    

    これは、フォルダのファイルパスを作成します。ファイルパスの下には、パーティションのファイル(例えば一部-000 *)私は大きなCSVにすべてのパーティションを追加したい場合は、私は通常、何

    cat filePath/part* > mycsvfile.csv 
    

    一部を使用するあるを見つけることができますcoalesce(1,false) RDDから1つのパーティションを作成します。それはあなたが収集しているすべてのデータをプルすることによってドライバを圧倒する可能性があるので、通常悪い練習です。

    df.rddRDD[Row]を返します。

  2. あなたはDatabricks火花CSV library使用することができます。

    • スパーク1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath) 
      
    • スパーク1.3:

      df.save(filepath,"com.databricks.spark.csv") 
      
  3. ウィットをh Spark 2.xspark-csvパッケージはSparkに含まれているので不要です。

    df.write.format("csv").save(filepath) 
    
  4. あなたは地元のパンダのデータフレームに変換し、to_csv方法を使用することができます(PySparkのみ)。

注:ソリューション1、2及び3は、あなたがsaveを呼び出す際の呼び出しをスパーク基礎となるのHadoop APIによって生成されたCSV形式ファイル(part-*)になります。 1つのパーティションに1つのpart-ファイルがあります。

+1

私は 'spark-csv'が望ましい解決策だと思います。正しいCSVラインを作成するのは簡単ではありません。すべての方言と適切なエスケープは非常に難しいことがあります。 – zero323

+0

私は完全に同意します – eliasah

+1

PySparkでは、小さなテーブルをPandasに変換してローカルに保存することもできます。おそらくScalaの質問でしょう。 – zero323

0

同様の問題がありました。私は、クライアントモードでクラスタに接続している間、ドライバにcsvファイルを書き留める必要がありました。

潜在的なエラーを避けるために、Apache Sparkと同じCSV解析コードを再利用したいと思っていました。

私はspark-csvコードをチェックして、データフレームをraw csv RDD[String]に変換するコードがcom.databricks.spark.csv.CsvSchemaRDDであることを発見しました。

悲しいことに、悲しいことに、sc.textFileと関連するメソッドの最後でハードコードされています。

そのコードをコピーして最後の行をsc.textFileで削除し、代わりにRDDを直接返しました。

マイコード:

/* 
    This is copypasta from com.databricks.spark.csv.CsvSchemaRDD 
    Spark's code has perfect method converting Dataframe -> raw csv RDD[String] 
    But in last lines of that method it's hardcoded against writing as text file - 
    for our case we need RDD. 
*/ 
object DataframeToRawCsvRDD { 

    val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat 

    def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map()) 
      (implicit ctx: ExecutionContext): RDD[String] = { 
    val delimiter = parameters.getOrElse("delimiter", ",") 
    val delimiterChar = if (delimiter.length == 1) { 
     delimiter.charAt(0) 
    } else { 
     throw new Exception("Delimiter cannot be more than one character.") 
    } 

    val escape = parameters.getOrElse("escape", null) 
    val escapeChar: Character = if (escape == null) { 
     null 
    } else if (escape.length == 1) { 
     escape.charAt(0) 
    } else { 
     throw new Exception("Escape character cannot be more than one character.") 
    } 

    val quote = parameters.getOrElse("quote", "\"") 
    val quoteChar: Character = if (quote == null) { 
     null 
    } else if (quote.length == 1) { 
     quote.charAt(0) 
    } else { 
     throw new Exception("Quotation cannot be more than one character.") 
    } 

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL") 
    val quoteMode: QuoteMode = if (quoteModeString == null) { 
     null 
    } else { 
     QuoteMode.valueOf(quoteModeString.toUpperCase) 
    } 

    val nullValue = parameters.getOrElse("nullValue", "null") 

    val csvFormat = defaultCsvFormat 
     .withDelimiter(delimiterChar) 
     .withQuote(quoteChar) 
     .withEscape(escapeChar) 
     .withQuoteMode(quoteMode) 
     .withSkipHeaderRecord(false) 
     .withNullString(nullValue) 

    val generateHeader = parameters.getOrElse("header", "false").toBoolean 
    val headerRdd = if (generateHeader) { 
     ctx.sparkContext.parallelize(Seq(
     csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*) 
    )) 
    } else { 
     ctx.sparkContext.emptyRDD[String] 
    } 

    val rowsRdd = dataFrame.rdd.map(row => { 
     csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*) 
    }) 

    headerRdd union rowsRdd 
    } 

} 
0

私は定義された名前のCSVファイルにデータフレームの内容を保存していた同様の問題がありました。 df.write("csv").save("<my-path>")がファイル以外のディレクトリを作成していました。だから以下の解決策を考え出す必要があります。 ほとんどのコードは、ロジックにほとんど変更を加えずに、次のdataframe-to-csvから取得しています。

def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = { 
    val tmpParquetDir = "Posts.tmp.parquet" 

    df.repartition(1).write. 
     format("com.databricks.spark.csv"). 
     option("header", header.toString). 
     option("delimiter", sep). 
     save(tmpParquetDir) 

    val dir = new File(tmpParquetDir) 
    val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv" 
    val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString 
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput)) 

    dir.listFiles.foreach(f => f.delete) 
    dir.delete 
    }