2016-12-19 23 views
2

df.write.csvを使用してCSVファイルにデータを追加しようとしています。これは私がスパーク文書http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriterに従った後やったことです:APPEND、エラーなしpysparkでdf.write.csvを使用してcsvファイルに追加するにはどうすればよいですか?

NameError: name 'append' not defined

:上記のコードを実行

from pyspark.sql import DataFrameWriter 
..... 
df1 = sqlContext.createDataFrame(query1) 
df1.write.csv("/opt/Output/sqlcsvA.csv", append) #also tried 'mode=append' 

は私にエラーを与える

The path already exists.

+0

sqlcsvA.csvというファイルがありますか? –

+0

はい出力は 'sqlcsvA.csv'ファイルにコピーされます。 – kaks

+0

コードからこのファイルを削除して再度作成できますか? –

答えて

0

私は、Pythonについてありませんしかし、ScalaとJavaでは、以下の方法で保存モードを設定できます:

df.write.mode("append").csv("pathToFile") 

私はそれがPythonで似ているはずだと仮定します。 Thisが役に立ちます。 V1.4

例えば

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None)

ので https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter:ドキュメントから

+0

私はPythonであなたが言ったことを試みました。しかし、私の出力の各行は、 'sqlcsvA.csv'という一つのフォルダにある別々のcsvファイルにコピーされます。それらは単一のcsvファイルにコピーされません。 – kaks

+1

@kaks、それらのファイルを手動でマージする必要があるようです。この[質問](http://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv)を見てください。例えば、人々は[FileUtil.copyMerge](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileUtil.html#copyMerge(org.apache.hadoop.fs。 FileSystem、%20org.apache.hadoop.fs.Path、%20org.apache.hadoop.fs.FileSystem、%20org.apache.hadoop.fs.Path、%20boolean、%20org.apache.hadoop.conf.Configuration、% 20java.lang.String))をJavaで使用します。 –

+0

@kaks(Sparkで結果を読み戻すと、それらのファイルがマージされ、そのディレクトリ内のすべてのファイルのデータを含むDataFrameがあることに注意してください。 –

3
df.write.save(path='csv', format='csv', mode='append', sep='\t') 
+0

これは、出力を別のファイルに分割します。それは分割されます。 – kaks

+2

書き込み前に '.coalesce(1)'をインクルードすると、パーティショニングを防ぐことができます。結果が追加されるかどうかはわかりません! 'df.coalesce(1).write.save(パス= 'csv'、フォーマット= 'csv'、モード= '追加'、sep = '\ t')' – Jarek

+0

ありがとうございます。それはすべてを1つのファイルにまとめました。 – kaks

2

あなたが1つのファイルを書き込みたい場合は

from pyspark.sql import DataFrameWriter 
..... 
df1 = sqlContext.createDataFrame(query1) 
df1.write.csv(path="/opt/Output/sqlcsvA.csv", mode="append") 

、あなたはそれらの行のいずれかにCOALESCEまたはrepartitionを使用することができます。データフレームは単にDAGの実行であるため、どのラインでも問題ありません.csvへの書き込みまでは実行されません。 repartition &​​3210は効果的に同じコードを使用しますが、合体はパーティションの数を減らすだけで、repartitionもそれらを増やすことができます。わかりやすくするためにrepartitionに固執しています。

df1 = sqlContext.createDataFrame(query1).repartition(1) 

または

df1.repartition(1).write.csv(path="/opt/Output/sqlcsvA.csv", mode="append") 

私はドキュメントの例は素晴らしいではありません、彼らはパス以外のパラメータの使用例を示していないと思います。あなたがしようとした二つのものを参照する

:仕事にそのために

(append)

は、「追加」値を含むAPPENDという名前の文字列変数があることが必要となります。 appendというDataFrameWriterライブラリには文字列定数はありません。 つまり、これをコードの前半に追加すると、それが機能します。 APPEND =仕事にそのために

('mode=append')

は、CSV形式の方法はあなただけ持つことができたときに余分な作業になりモードの値を、取得するために mode=append文字列を解析しなければならない「追加」パラメータは、抽出する必要のある値「append」または「overwrite」とまったく同じです。特別なケースはありません。Pythonが組み込まれており、pysparkに特有のものではありません。

可能であれば、名前付きパラメータを使用することをお勧めします。例: 代わりに位置パラメータ

csv("/path/to/file.csv", "append") 

csv(path="/path/to/file.csv", mode="append") 

それは明確だし、理解を助けます。

関連する問題