現在、何百ものCSVファイルを含むパブリックAWSバケットのテキストファイルをすべて読み込んでいます。私は一度にすべてのCSVファイルを読んでから、RDDに変えてデータをマッサージして、それをCassandraに保存できるようにします。すべてのテキストファイルを処理するのに2時間半かかるため、100GBのデータでは長すぎます。私のコードにもっと速くするためにできることはありますか?データフレームとSQLを使用したPysparkのパフォーマンスの向上
私は何か提案をいただきありがとうございます。私もこのhttps://robertovitillo.com/2015/06/30/spark-best-practices/を読んでみましたが、「正しいレベルの並列処理を使用する」のようなものを実装する方法と混同しています。私はrdd.cacheを実行して自分のRDDをキャッシュに保存しようとしましたが、それでも2時間以上かかりました。ここで
conf = SparkConf() \
.setMaster("spark://%s:%s" % (SPARK_IP, SPARK_PORT))
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
rdd = sc.textFile("s3a://gdelt-open-data/events/*")
def rddCleaning(rd,timeframe):
def check_valid(tup):
try:
int(tup[1])
int(tup[4])
float(tup[5])
float(tup[6])
return True
except ValueError:
return False
def fillin(tup):
if tup[2] == "" and tup[3] != "":
return ((tup[0],tup[1],tup[3],tup[4],tup[5],tup[6]))
else:
return ((tup[0],tup[1],tup[2],tup[4],tup[5],tup[6]))
def popular_avg(curr_tup):
lst = curr_tup[1]
dictionary = curr_tup[2]
dict_matches = {}
for tup in lst:
event_type = tup[0]
dict_matches[event_type] = dictionary[event_type]
return ((curr_tup[0],lst,dict_matches,curr_tup[3]))
def merge_info(tup):
main_dict = tup[1]
info_dict = tup[2]
for key in info_dict:
main_dict[key].update(info_dict[key])
main_dict["TotalArticles"] = {"total":tup[3]}
return ((tup[0],main_dict))
def event_todict(tup):
lst = tup[1]
dict_matches = {}
for event_tup in lst:
dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
return ((tup[0],dict_matches,tup[2],tup[3]))
def sum_allevents(tup):
type_lst = tup[1]
total_mentions = 0
for event in type_lst:
total_mentions += event[1]
return ((tup[0],type_lst,tup[2],total_mentions))
actionGeo_CountryCode = 51
time = 0
actor1Type1Code = 12
actor2Type1Code = 22
numArticles = 33
goldsteinScale = 30
avgTone = 34
if timeframe == "SQLDATE":
time = 1
elif timeframe == "MonthYear":
time = 2
else:
time = 3
rdd_reduce = rd.map(lambda x: x.split('\t')) \
.map(lambda y: ((y[actionGeo_CountryCode],
y[time],
y[actor1Type1Code],
y[actor2Type1Code],
y[numArticles],
y[goldsteinScale],
y[avgTone]))) \
.filter(check_valid) \
.map(lambda c: ((c[0],int(c[1]),c[2],c[3],int(c[4]),int(float(c[5])),int(float(c[6]))))) \
.map(fillin) \
.filter(lambda r: r[0] in tofullname and r[2] in toevent and r[2] != "" and r[0] != "") \
.map(lambda t: ((tofullname[t[0]],t[1],toevent[t[2]],t[3],t[4],t[5]))) \
.map(lambda f: (((f[0],f[1],f[2]),(f[3],f[4],f[5],1)))) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])) \
.map(lambda s: ((s[0],(s[1][0],s[1][1]/s[1][3],s[1][2]/s[1][3]))))
rdd_format = rdd_reduce.map(lambda t:((t[0][0],t[0][1]),
([(t[0][2],t[1][0])],
[(t[0][2],{"GoldsteinScaleAvg":t[1][1],
"ToneAvg":t[1][2]})]))) \
.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
.map(lambda v: (v[0],
sorted(v[1][0],key=itemgetter(1),reverse=True),
v[1][1])) \
.map(sum_allevents) \
.map(lambda f: ((f[0],f[1][:5],dict(f[2]),f[3]))) \
.map(popular_avg) \
.map(event_todict) \
.map(merge_info) \
.map(lambda d: ((d[0][0],d[0][1],d[1])))
return rdd_format
daily_rdd = rddCleaning(rdd,"SQLDATE")
print(daily_rdd.take(6));
monthly_rdd = rddCleaning(rdd,"MonthYear")
print(monthly_rdd.take(6));
yearly_rdd = rddCleaning(rdd,"Year")
print(yearly_rdd.take(6));
編集: 私は自分のコードに次の変更を加え、それがパフォーマンスを改善したが、それはまだ取っています長い時間。これは起こっているのは、私がdfを呼び出すたびにS3バケットからすべてのファイルをもう一度読み込んでいるからです。 dfと一時テーブルのいくつかをキャッシュに入れるべきですか?私は考えることができる
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, DoubleType, IntegerType
from abbreviations_dict import tofullname, toevent
from operator import itemgetter
import pyspark_cassandra
sc = SparkContext()
sqlContext = SQLContext(sc)
customSchema = schema = StructType([
StructField('GLOBALEVENTID',StringType(),True),
StructField('SQLDATE',StringType(),True),
StructField('MonthYear',StringType(),True),
StructField('Year',StringType(),True),
StructField('FractionDate',StringType(),True),
StructField('Actor1Code',StringType(),True),
StructField('Actor1Name',StringType(),True),
StructField('Actor1CountryCode',StringType(),True),
StructField('Actor1KnownGroupCode',StringType(),True),
StructField('Actor1EthnicCode',StringType(),True),
StructField('Actor1Religion1Code',StringType(),True),
StructField('Actor1Religion2Code',StringType(),True),
StructField('Actor1Type1Code',StringType(),True),
StructField('Actor1Type2Code',StringType(),True),
StructField('Actor1Type3Code',StringType(),True),
StructField('Actor2Code',StringType(),True),
StructField('Actor2Name',StringType(),True),
StructField('Actor2CountryCode',StringType(),True),
StructField('Actor2KnownGroupCode',StringType(),True),
StructField('Actor2EthnicCode',StringType(),True),
StructField('Actor2Religion1Code',StringType(),True),
StructField('Actor2Religion2Code',StringType(),True),
StructField('Actor2Type1Code',StringType(),True),
StructField('Actor2Type2Code',StringType(),True),
StructField('Actor2Type3Code',StringType(),True),
StructField('IsRootEvent',StringType(),True),
StructField('EventCode',StringType(),True),
StructField('EventBaseCode',StringType(),True),
StructField('EventRootCode',StringType(),True),
StructField('QuadClass',StringType(),True),
StructField('GoldsteinScale',StringType(),True),
StructField('NumMentions',StringType(),True),
StructField('NumSources',StringType(),True),
StructField('NumArticles',StringType(),True),
StructField('AvgTone',StringType(),True),
StructField('Actor1Geo_Type',StringType(),True),
StructField('Actor1Geo_FullName',StringType(),True),
StructField('Actor1Geo_CountryCode',StringType(),True),
StructField('Actor1Geo_ADM1Code',StringType(),True),
StructField('Actor1Geo_Lat',StringType(),True),
StructField('Actor1Geo_Long',StringType(),True),
StructField('Actor1Geo_FeatureID',StringType(),True),
StructField('Actor2Geo_Type',StringType(),True),
StructField('Actor2Geo_FullName',StringType(),True),
StructField('Actor2Geo_CountryCode',StringType(),True),
StructField('Actor2Geo_ADM1Code',StringType(),True),
StructField('Actor2Geo_Lat',StringType(),True),
StructField('Actor2Geo_Long',StringType(),True),
StructField('Actor2Geo_FeatureID',StringType(),True),
StructField('ActionGeo_Type',StringType(),True),
StructField('ActionGeo_FullName',StringType(),True),
StructField('ActionGeo_CountryCode',StringType(),True),
StructField('ActionGeo_ADM1Code',StringType(),True),
StructField('ActionGeo_Lat',StringType(),True),
StructField('ActionGeo_Long',StringType(),True),
StructField('ActionGeo_FeatureID',StringType(),True),
StructField('DATEADDED',StringType(),True),
StructField('SOURCEURL',StringType(),True)])
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='false') \
.options(delimiter="\t") \
.load('s3a://gdelt-open-data/events/*', schema = customSchema)
def modify_values(r,y):
if r == '' and y != '':
return y
else:
return r
def country_exists(r):
if r in tofullname:
return tofullname[r]
else:
return ''
def event_exists(r):
if r in toevent:
return toevent[r]
else:
return ''
modify_val = udf(modify_values, StringType())
c_exists = udf(country_exists,StringType())
e_exists = udf(event_exists,StringType())
dfsub1 = df.withColumn("Actor1Type1Code",modify_val(col("Actor1Type1Code"),col("Actor2Type1Code"))) \
.withColumn("ActionGeo_CountryCode",c_exists(col("ActionGeo_CountryCode"))) \
.withColumn("Actor1Type1Code",e_exists(col("Actor1Type1Code")))
sqlContext.registerDataFrameAsTable(dfsub1, 'temp')
df2 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
SQLDATE, MonthYear, Year,
Actor1Type1Code,
NumArticles,
GoldsteinScale,
AvgTone
FROM temp
WHERE ActionGeo_CountryCode <> ''
AND Actor1Type1Code <> ''
AND NumArticles <> ''
AND GoldsteinScale <> ''
AND AvgTone <> ''""")
sqlContext.registerDataFrameAsTable(df2, 'temp2')
df3 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
CAST(SQLDATE AS INTEGER), CAST(MonthYear AS INTEGER), CAST(Year AS INTEGER),
Actor1Type1Code,
CAST(NumArticles AS INTEGER),
CAST(GoldsteinScale AS INTEGER),
CAST(AvgTone AS INTEGER)
FROM temp2""")
sqlContext.registerDataFrameAsTable(df3, 'temp3')
sqlContext.cacheTable('temp3')
dfdaily = sqlContext.sql("""SELECT ActionGeo_CountryCode,
SQLDATE,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) AS AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
SQLDATE,
Actor1Type1Code""")
dfmonthly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
MonthYear,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) as AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
MonthYear,
Actor1Type1Code""")
dfyearly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
Year,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) as AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
Year,
Actor1Type1Code""")
def rddCleaning(rd,timeframe):
def popular_avg(curr_tup):
lst = curr_tup[1]
dictionary = curr_tup[2]
dict_matches = {}
for tup in lst:
event_type = tup[0]
dict_matches[event_type] = dictionary[event_type]
return ((curr_tup[0],lst,dict_matches,curr_tup[3]))
def merge_info(tup):
main_dict = tup[1]
info_dict = tup[2]
for key in info_dict:
main_dict[key].update(info_dict[key])
main_dict["TotalArticles"] = {"total":tup[3]}
return ((tup[0],main_dict))
def event_todict(tup):
lst = tup[1]
dict_matches = {}
for event_tup in lst:
dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
return ((tup[0],dict_matches,tup[2],tup[3]))
def sum_allevents(tup):
type_lst = tup[1]
total_mentions = 0
for event in type_lst:
total_mentions += event[1]
return ((tup[0],type_lst,tup[2],total_mentions))
rdd_format = rd.map(lambda y: ((y["ActionGeo_CountryCode"],y[timeframe]),
([(y["Actor1Type1Code"],y["NumArticles"])],
[(y["Actor1Type1Code"],{"Goldstein":y["GoldsteinScale"],"ToneAvg":y["AvgTone"]})]
))) \
.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
.map(lambda v: (v[0],
sorted(v[1][0],key=itemgetter(1),reverse=True),
dict(v[1][1]))) \
.map(sum_allevents) \
.map(popular_avg) \
.map(event_todict) \
.map(merge_info) \
.map(lambda d: ((d[0][0],d[0][1],d[1])))
return rdd_format
print("THIS IS THE FIRST ONE ######################################################")
daily_rdd = rddCleaning(dfdaily.rdd,"SQLDATE")
print(daily_rdd.take(5))
print("THIS IS THE SECOND ONE ######################################################")
monthly_rdd = rddCleaning(dfmonthly.rdd,"MonthYear")
print(monthly_rdd.take(5))
print("THIS IS THE THIRD ONE ######################################################")
yearly_rdd = rddCleaning(dfyearly.rdd,"Year")
print(yearly_rdd.take(5))
まず、すべてのudfsを緩めます。これらのどれも必要ありません。 – zero323
ありがとうございました!ほんとうにありがとう。 私は、SQLでPython辞書を使う方法がわからないので、ufdsを使用しました。 2つのudfsでは、別のpythonファイルに格納されている辞書に基づいて行の名前を変更しています。たとえば、ある列のセルに「US」がある場合、その列をudfの「United States」に名前を変更します。 SQLで辞書変数を使用できますか? – CatherineAlv
'join'を' broadcast'や[literal map](http://stackoverflow.com/a/32788650/1560062)と併用することができます。 – zero323