スパーク2.0.0+:あなたは、単に使用することができるはずですので、内蔵csv形式
は、箱から出してパーティショニングをサポートしています。
df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)
追加のパッケージを含まずに。
スパーク< 2.0.0:
この時点で(1.4.0)spark-csv
はpartitionBy
をサポートしていません(を参照)が、あなたが望むものを達成するために、組み込みのソースを調整することができます。
2つのアプローチを試すことができます。手動書き込みの値を準備することができます
df = sc.parallelize([
("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])
:
from pyspark.sql.functions import col, concat_ws
key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])
kvs = df.select(key, values)
とtext
を使用して書き込みデータが比較的単純であると仮定すると、(複雑な文字列と文字エスケープのために必要)とは、多かれ少なかれ、このようになりますあなたが同様の方法で前処理値に適切なCSVパーサーを使用しようとすることができ、より複雑なケースでは、ソース
kvs.write.partitionBy("k").text("/tmp/foo")
df_foo = (sqlContext.read.format("com.databricks.spark.csv")
.options(inferSchema="true")
.load("/tmp/foo/k=foo"))
df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)
、いずれかのUDFを使用するか、オーバーマッピングすることにより、 RDDが、それははるかに高価になります。
CSV形式であなたもpartitionBy
をサポートJSONライターを使用することができ、ハード要件がない場合には、すぐに使える:
df.write.partitionBy("k").json("/tmp/bar")
だけでなく、読み取りのパーティション発見。
質問には一切触れていません。 – zero323