2016-08-23 24 views
0

とスパークに減らし、私はS3上の異なる場所のデータから別のデータフレームを作成して、単一データフレームにデータフレームをマージしようとしています。今はforループを使っています。しかし、私はマップを使ってはるかに効率的なやり方でpysparkの機能を減らすことができると感じています。 、ここでの最大の問題ではない次善のが、union地図でforループ並列化と私のアプリケーションではpyspark

from pyspark import SparkConf, SparkContext 
from pyspark.sql import SQLContext, GroupedData 
import pandas as pd 
from datetime import datetime 


sparkConf = SparkConf().setAppName('myTestApp') 
sc = SparkContext(conf=sparkConf) 
sqlContext = SQLContext(sc) 

filepath = 's3n://my-s3-bucket/report_date=' 

date_from = pd.to_datetime('2016-08-01',format='%Y-%m-%d') 
date_to = pd.to_datetime('2016-08-22',format='%Y-%m-%d') 
datelist = pd.date_range(date_from, date_to) 

First = True 

#THIS is the for-loop I want to get rid of 
for dt in datelist: 
    date_string = datetime.strftime(dt, '%Y-%m-%d') 
    print('Running the pyspark - Data read for the date - '+date_string) 
    df = sqlContext.read.format("com.databricks.spark.csv").options(header = "false", inferschema = "true", delimiter = "\t").load(filepath + date_string + '/*.gz') 

    if First: 
     First=False 
     df_Full = df 
    else: 
     df_Full = df_Full.unionAll(df) 
+0

あなたは、Sparkを使用するための制約を持っていますか?あなたはdaskを代わりに使用すると考えていないのですか? DASKは、あなたが簡単に私はこの後、私はそれにスパークALGOSを実行していることになるので、火花を使用する必要が – Boud

+0

をやろうとしているものを達成するために、他の素敵なものの間に設計されています。また、データのサイズもかなり大きいです。 – nishant

+0

http://dask.pydata.org/ja/latest/spark.html – Boud

答えて

1

実際の反復:ここに私のコードです。スキーマの推論によってはるかに深刻な問題が発生します(inferschema = "true")。

それは怠惰ではないデータフレームを作成しますが、また、単に推論のための個別のデータのスキャンを必要とするだけでなく。あなたは、フロントアップスキーマを知っていれば、あなたはDataFrameReaderの引数としてそれを提供する必要があります

schema = ... 

df = sqlContext.read.format("com.databricks.spark.csv").schema(schema) 

をそうでなければ、最初DataFrameからそれを抽出することができます。うまくチューニングされた並列処理と組み合わせることで、それがうまく動作するはずですが、あなたはフェッチファイルの数が多い場合、あなたはまた、反復組合より少し賢くアプローチを検討すべきです。私の答えは、Spark union of multiple RDDsです。それはより高価ですが、より良い一般的な特性を持っています。

分散データ構造に操作をネストすることはできないので、map内のデータを読み取る場合は、SQLContextを使わずにS3クライアントを直接使用する必要があります。

関連する問題