2016-05-29 11 views
4

私はhdfsの場所に火花のデータフレームを書き込もうとしていますが、私が 'partitionBy'表記を追加すると、パーティションは (寄木細工の書式) "partition_column_name = partition_value"の形式のフォルダ (つまりpartition_date=2016-05-03)。 はそうするために、私は、次のコマンドを実行しました:パーティションを持つCSVとして書き込むデータフレーム

df.write.partitionBy('partition_date').mode('overwrite').format("com.databricks.spark.csv").save('/tmp/af_organic')

が、パーティションフォルダが 作成されていない任意のアイデア火花DFが自動的にそれらのフォルダを作成するために、私は順番に行うsould何?

おかげで、

答えて

13

スパーク2.0.0+:あなたは、単に使用することができるはずですので、内蔵csv形式

は、箱から出してパーティショニングをサポートしています。

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path) 

追加のパッケージを含まずに

スパーク< 2.0.0

この時点で(1.4.0)spark-csvpartitionByをサポートしていません(を参照)が、あなたが望むものを達成するために、組み込みのソースを調整することができます。

2つのアプローチを試すことができます。手動書き込みの値を準備することができます

df = sc.parallelize([ 
    ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1) 
]).toDF(["k", "x1", "x2", "x3"]) 

from pyspark.sql.functions import col, concat_ws 

key = col("k") 
values = concat_ws(",", *[col(x) for x in df.columns[1:]]) 

kvs = df.select(key, values) 

textを使用して書き込みデータが比較的単純であると仮定すると、(複雑な文字列と文字エスケープのために必要)とは、多かれ少なかれ、このようになりますあなたが同様の方法で前処理値に適切なCSVパーサーを使用しようとすることができ、より複雑なケースでは、ソース

kvs.write.partitionBy("k").text("/tmp/foo") 

df_foo = (sqlContext.read.format("com.databricks.spark.csv") 
    .options(inferSchema="true") 
    .load("/tmp/foo/k=foo")) 

df_foo.printSchema() 
## root 
## |-- C0: integer (nullable = true) 
## |-- C1: double (nullable = true) 
## |-- C2: double (nullable = true) 

、いずれかのUDFを使用するか、オーバーマッピングすることにより、 RDDが、それははるかに高価になります。

CSV形式であなたもpartitionByをサポートJSONライターを使用することができ、ハード要件がない場合には、すぐに使える:

df.write.partitionBy("k").json("/tmp/bar") 

だけでなく、読み取りのパーティション発見。

2

私は次のように使用することをお勧め:

nは、パーティションの数である
df = your dataframe object 
df.coalesce(n).write.csv('name_of_outputfolder',header=True) 

これはすべきことです。それがどうなるか教えてください!

+0

質問には一切触れていません。 – zero323

関連する問題