2016-10-04 14 views
3

私はpyspark 2.0、hadoop 2.7.2で動作します。RDDを1つの寄木細工ファイルに保存するにはどうすればよいですか?

def func(df): 
    new_df = pd.DataFrame(df['id']) 
    new_df['num'] = new_df['num'] * 12 
    return new_df 

set = sqlContext.read.parquet("data_set.parquet") 
columns = set.columns 
map_res = set.rdd.mapPartitions(lambda iter_: func(pd.DataFrame(list(iter_), 
                columns=columns))) 

は今、私は寄木細工のファイルnew.parquetとしてmap_res RDDを保存する必要があります。 そしてここでは私のコードです。 保存する前に大規模なデータフレームを作成せずに何かできますか?または、RDDの各パーティションを個別に保存してから、保存したすべてのファイルをマージする可能性がありますか?

p.s.私は本当に大きなサイズのためにデータフレームを作成せずに管理したいです。

+0

@サントンすべての単一のデータフレームをスキーマを保存した大きなものにマージする必要があるようです。 RDDの要素としてそれらを保持すると、DataFrameのように結果を操作することはできません。 –

+0

@ИванСудос正しいので、すべてのデータを1つのノードに移動する必要はありません –

+0

@santonパイプラインを作成すると、パラメータとして1つの寄せ集めファイルが扱いやすくなります –

答えて

2

これを行うための唯一の2通りの方法があります。

一つは、これは、すべてのデータが1つのファイルではなく、複数のファイルに保存されていることを確認します"coalesce(1)" を使用している(200パーティションのスパークデフォルトnoです)の使用は、 dataframe.write.save("/this/is/path")

もう1つのオプションは、出力をハイブテーブルに書き込んだり、タブで区切られたhive -e "select * from table" > data.tsvを使用することです。

1

私はこの提案:あなたは、パーティションの数が少ない(2-100)を持っている場合、それはかなり速く動作するはず

dataframes = [] 
#creating index 
map_res = map_res.zipWithIndex() 
# setting index as key 
map_res = map_res.map(lambda x: (x[1],x[0])) 
# creating one spark df per element 
for i in range(0, map_res.count()): 
    partial_dataframe_pd = map_res.lookup(i) 
    partial_dataframe = sqlContext.createDataFrame(partial_dataframe_pd) 
    dataframes.append(partial_dataframe) 
# concatination 
result_df = dataframes.pop() 
for df in dataframes: 
    result_df.union(df) 
#saving 
result_df.write.parquet("...") 

を。

関連する問題