2017-05-11 11 views
1

私はPythonでプログラミングするのに慣れています。私の会社はJupyterをインストールしたHadoop Clusterを取得しました。今まで私はSpark/Pysparkを何も使用していませんでした。(spark、python、pyspark、jupyter)を使ってHDFSに複数のアイテムを保存する

text_file = sc.textFile("/user/myname/student_grades.txt") 

そして、このような出力を書き込むことができ - 私:私はこれと同じように簡単にHDFSからファイルをロードすることができる午前

text_file.saveAsTextFile("/user/myname/student_grades2.txt") 

- 私が達成しようとの事単純な "forループ"を使用して、テキストファイルを1つずつ読み込み、その内容を1つのHDFSファイルに書き込むことです。だから私はこれを試してみました:

list = ['text1.txt', 'text2.txt', 'text3.txt', 'text4.txt'] 

for i in list: 
    text_file = sc.textFile("/user/myname/" + i) 
    text_file.saveAsTextFile("/user/myname/all.txt") 

は、これは、リストの最初の要素のために動作しますが、その後、私は、このエラーメッセージが表示できます:IP -out I「blured」混乱を避けるために

Py4JJavaError: An error occurred while calling o714.saveAsTextFile. 
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
XXXXXXXX/user/myname/all.txt already exists 

をXXXXXXXXの住所


これを行う正しい方法は何ですか? 私は、( 'text1'、 'text2' ...のような)たくさんのデータセットを持ち、それぞれをHDFSに保存する前にそれらのpython関数を実行したいと考えています。しかし、結果をまとめて1つの出力ファイルにまとめたいと思います。

ありがとうございます!
MG

EDIT: 私の最終目標は本当に明確ではなかったようです。関数を各テキストファイルに別々に適用し、出力を既存の出力ディレクトリに追加する必要があります。このような何か:

for i in list: 
    text_file = sc.textFile("/user/myname/" + i) 
    text_file = really_cool_python_function(text_file) 
    text_file.saveAsTextFile("/user/myname/all.txt") 

答えて

0

あなたは、出力ディレクトリ内のすべての部品ファイルを取得する複数のファイルを読み込み、

textfile = sc.textFile(','.join(['/user/myname/'+f for f in list])) 
textfile.saveAsTextFile('/user/myname/all') 

ことによって、それらを保存することができます。

+0

私の最終目標は本当に明確ではなかったようです。関数を各テキストファイルに別々に適用し、出力を既存の出力ディレクトリに追加する必要があります。すべてのテキストファイルに対してEDIT – mgruber

+0

同じ機能を参照してください。 – Suresh

+0

はい、すべてのファイルで同じ機能ですが、各ファイルを個別に処理する必要があるため、前にテキストファイルに参加できません。 – mgruber

1

私はこれをコメントとして投稿したかったが、評判が足りないため、コメントできませんでした。

RDDをデータフレームに変換して、追加モードで書き込む必要があります。データフレームにRDDを変換するには、この答えに見てください。
https://stackoverflow.com/a/39705464/3287419
またはリンクが有用である可能性がより低いhttp://spark.apache.org/docs/latest/sql-programming-guide.html
は追加モードでデータフレームを保存するには、このリンクを:
http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes

ほぼ同じ質問もここSpark: Saving RDD in an already existing path in HDFSです。しかし、答えはスカラーです。私はPythonでも同様のことができることを願っています。

もう1つ(しかし醜い)アプローチがあります。 RDDを文字列に変換します。結果の文字列をresultStringとします。サブプロセスを使用して、その文字列を宛先ファイルに追加します。

subprocess.call("echo "+resultString+" | hdfs dfs -appendToFile - <destination>", shell=True) 
0

テキストはすべて同じスキーマを持っている、あなたは、単一のテーブルとしてフォルダ全体を読み取るためにHiveを使用し、直接その出力を書くことができファイル場合。

関連する問題