2017-04-17 15 views
0

Spark DFにロードする前にロードして前処理する必要があるExcelファイルがいくつかあります。私はこれらのファイルのリストを処理する必要があります。 (パス、パンダDATAFRAME)タプルにタプル複数のパンダDFを1つのスパークDFに変換する方法は?

file_list_rdd = sc.emptyRDD() 

for file_path in file_list: 
    current_file_rdd = sc.binaryFiles(file_path) 
    print(current_file_rdd.count()) 
    file_list_rdd = file_list_rdd.union(current_file_rdd) 

私は、(パス、バイト)のセットからfile_list_rddをオン一部のマッパー機能を持っている:私はそれらを読むためにこのような何かを行います。これにより、私はPandasを使ってExcelファイルを読み込み、ファイルを操作してSpark DataFrameにする前に統一しています。

(ファイルパス、パンダDF)タプルのRDDをどのようにして1つのSpark DFにするのですか?私は、単一の変換を行うことができる関数を認識していますが、いくつかの変換を行うことはできません。

私の最初の試みは、このようなものだった:(スタックトレースはあまりをしないので、それは推測だ私はsqlCtxは、計算と一緒に配布されていないため、動作しませんでした推測している

sqlCtx = SQLContext(sc) 

def convert_pd_df_to_spark_df(item): 
    return sqlCtx.createDataFrame(item[0][1]) 

processed_excel_rdd.map(convert_pd_df_to_spark_df) 

私に感謝する)。

読んでいただきありがとうございます:)。

+0

残念ながら私は何千ものExcelファイルを与えられています。 – bstempi

答えて

0

なぜデータフレームまたはファイル名のリストを作成してから、ループ内でユニオンを呼び出してください。このような何か:

もしパンダのデータフレーム:

dfs = [df1, df2, df3, df4] 
sdf = None 
for df in dfs: 
    if sdf: 
     sdf = sdf.union(spark.createDataFrame(df)) 
    else: 
     sdf = spark.createDataFrame(df) 

ファイル名場合:

names = [name1, name2, name3, name4] 
sdf = None 
for name in names: 
    if sdf: 
     sdf = sdf.union(spark.createDataFrame(pd.read_excel(name)) 
    else: 
     sdf = spark.createDataFrame(pd.read_excel(name)) 
+0

何千ものExcelファイルがある場合、これは遅くなります。私のアプローチの初めの目標は、Sparkの並列化を活用しようとすることでした。私がforループでこれを行うことができれば、私はSparkをまったく必要としません。私の場合は、数ギガバイトのExcelデータです。 – bstempi

+0

@ zero323 'processed_excel_rdd'の' flatMap(lambda x:x [1] .values) 'を呼び出すと、期待どおりのnd_arrayオブジェクトが得られます。 nd_arrayオブジェクトでいっぱいのRDD上で 'toDF'を呼び出すと、このスタックトレースが発生します:https://pastebin.com/nX4gkXvg – bstempi

0

私はこのような関数を書くことで、これを解決するには:

def pd_df_to_row(rdd_row): 
    key = rdd_row[0] 
    pd_df = rdd_row[1]   

    rows = list() 
    for index, series in pd_df.iterrows(): 
     # Takes a row of a df, exports it as a dict, and then passes an unpacked-dict into the Row constructor 

     row_dict = {str(k):v for k,v in series.to_dict().items()} 
     rows.append(Row(**row_dict)) 

    return rows 

あなたはそれを呼び出すことができます次のように呼び出すことによって:

processed_excel_rdd = processed_excel_rdd.flatMap(pd_df_to_row) 

pd_df_to_rowには、Spark オブジェクトのコレクションが含まれています。あなたは今言うことができる:

processed_excel_rdd.toDF() 

Seriesよりも効率的なものはおそらくあります - >dict - > 操作は、これは私を通じました。

関連する問題