2017-11-29 9 views
1

私はPySpark 2.1を使用しています。特殊なフォーマットの.txtファイルにデータフレームを書き込む方法を考え出す必要があります。典型的なjsonやcsvではなく、CTF形式(CNTKの場合)です。PySpark DFを特殊フォーマットのファイルに書き込む

ファイルには、それがフォームを次のなどの余分な括弧やカンマを持つことはできません。

|label val |features val val val ... val 
|label val |features val val val ... val 

次のように、これはあるかもしれない示すためにいくつかのコードを:

l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)] 
rdd = sc.parallelize(l) 
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))).toDF() 
people.show(n=4) 

def Convert_to_String(r): 
    return '|label ' + r.name + ' ' + '|features ' + str(r.age) + '\n' 

m_p = people.rdd.map(lambda r: Row(Convert_to_String(r))).toDF() 
m_p.show(n=3) 

上記の例では、私が考え余分な文字がないファイルに各行の各文字列を追加するだけです。

実際のデータフレームはかなり大きいです。それが複数のファイルに分割されている可能性があります。結果が単一のファイルであれば好ましいでしょう。

洞察力は非常に役に立ちます。

感謝!

+0

単に 'm_p.saveAsTextFile(path)'を呼び出すことはできませんか?出力は複数のファイルに分割される可能性がありますが、それらを非常に簡単に連結することができます。私はテキストファイルをHDFSに書き込んで、後で 'hadoop fs -cat path/*> combined.txt'を使ってそれらを組み合わせるのと同じようなことをやっていました。 – pault

+0

は私がそれをすると空を保存します。 m_p.rdd.saveAsTextFile( "wasb://[email protected]/traindata/train_test.ctf") –

+0

Hmm。私はちょうどあなたのマップ関数で、行を文字列に変換してからDFに変換するために 'Row'に戻ったことに気付きました。 m_p = people.rdd.flatMap(lambda r:Convert_to_String(r)) ' – pault

答えて

1

私のコメントを回答に変換します。

各レコードをに変換し、toDF()を呼び出す代わりに、各レコードを文字列にマップするだけです。その後、saveAsTextFile()に電話してください。

path = 'path/to/output/file' 

# depending on your data, you may need to call flatMap 
m_p = people.rdd.flatMap(lambda r: Convert_to_String(r)) 

# now m_p will contain a list of strings that you can write to a file 
m_p.saveAsTextFile(path) 

データは複数のファイルに保存される可能性がありますが、コマンドラインからそれらを連結することもできます。コマンドは次のようになります:

hadoop fs -cat path/to/output/file/* > combined.txt 
関連する問題