2017-04-16 30 views
0

私は、uni_keyとcreatedDateの2つの列を含むデータフレームで作業しています。私はSQLクエリを実行し、結果を 'a'に保存して、これらの結果をcsvファイルに保存します。これを行う方法はありますか?spark 1.6でDataFrameをcsvファイルに保存する方法は?

import sys 
from pyspark import SparkContext 
from pyspark.sql import SQLContext, Row 
import csv 



if len(sys.argv) != 2: 

    print("Usage: cleaning_date.py <file>") 
    exit(-1) 
sc = SparkContext() 
sqlContext = SQLContext(sc) 
csvfile = sc.textFile("new_311.csv") 
key_val = csvfile.mapPartitions(lambda x: csv.reader(x)).map(lambda x: (x[0], x[1])) 
result = key_val.filter(lambda x: getDataType(x[1]) == "valid") 
myDataFrame = sqlContext.createDataFrame(result, ('uni_key', 'createdDate')) 
print(type(myDataFrame)) 
myDataFrame.registerTempTable("abc") 
a = sqlContext.sql("SELECT uni_key FROM abc") 
a.show() 
# a.write.format("com.databricks.spark.csv").save("l.csv") 
a.write.option("header", "true").csv("aaa.csv") 
sc.stop() 

このコードは今、次のエラーを与える:

はAttributeError:ここでは、コードスニペットはの 'DataFrameWriter' オブジェクトには属性 'CSV' を持っていない

+0

'a.write.format(" com.databricks.spark.csv ")。save(" l.csv ")'、not working? – Pushkr

答えて

2

組み込みCSVライターは、Spark 2.0で導入されています あなたはSpark 1.xを使います。

どちらかがspark-csvを使用します。最新のバージョンに

df.write.format("com.databricks.spark.csv").save(path) 

または更新スパーク。

+0

ありがとう@ user7875578、私は火花2に更新し、それは働いた。 :) – Tannavee

+0

PySpark '1.6.1'のために何をすべきですか? 'df.write.format(" com.databricks.spark.csv ")を実行しているときsave(path)'私が見ている 'java.lang.ClassNotFoundException:データソースの検索に失敗しました:com.databricks.spark.csv .' – Candic3

+0

@ Candic3このコマンドを使って起動するpyspark pyspark --packages com.databricks:spark-csv_2.10:1.4.0 – user2017

関連する問題