2017-02-13 7 views
1

現在、何百ものCSVファイルを含むパブリックAWSバケットのテキストファイルをすべて読み込んでいます。私は一度にすべてのCSVファイルを読んでから、RDDに変えてデータをマッサージして、それをCassandraに保存できるようにします。すべてのテキストファイルを処理するのに2時間半かかるため、100GBのデータでは長すぎます。私のコードにもっと速くするためにできることはありますか?データフレームとSQLを使用したPysparkのパフォーマンスの向上

私は何か提案をいただきありがとうございます。私もこのhttps://robertovitillo.com/2015/06/30/spark-best-practices/を読んでみましたが、「正しいレベルの並列処理を使用する」のようなものを実装する方法と混同しています。私はrdd.cacheを実行して自分のRDDをキャッシュに保存しようとしましたが、それでも2時間以上かかりました。ここで

conf = SparkConf() \ 
    .setMaster("spark://%s:%s" % (SPARK_IP, SPARK_PORT)) 

sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 
rdd = sc.textFile("s3a://gdelt-open-data/events/*") 

def rddCleaning(rd,timeframe): 

def check_valid(tup): 
    try: 
     int(tup[1]) 
     int(tup[4]) 
     float(tup[5]) 
     float(tup[6]) 
     return True 
    except ValueError: 
     return False 


def fillin(tup): 
    if tup[2] == "" and tup[3] != "": 
     return ((tup[0],tup[1],tup[3],tup[4],tup[5],tup[6])) 
    else: 
     return ((tup[0],tup[1],tup[2],tup[4],tup[5],tup[6])) 

def popular_avg(curr_tup): 
    lst = curr_tup[1] 
    dictionary = curr_tup[2] 
    dict_matches = {} 
    for tup in lst: 
    event_type = tup[0] 
     dict_matches[event_type] = dictionary[event_type] 
    return ((curr_tup[0],lst,dict_matches,curr_tup[3])) 

def merge_info(tup): 
    main_dict = tup[1] 
    info_dict = tup[2] 
    for key in info_dict: 
     main_dict[key].update(info_dict[key]) 
    main_dict["TotalArticles"] = {"total":tup[3]} 
    return ((tup[0],main_dict)) 

def event_todict(tup): 
    lst = tup[1] 
    dict_matches = {} 
    for event_tup in lst: 
     dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]} 
    return ((tup[0],dict_matches,tup[2],tup[3])) 

def sum_allevents(tup): 
    type_lst = tup[1] 
    total_mentions = 0 
    for event in type_lst: 
      total_mentions += event[1] 
    return ((tup[0],type_lst,tup[2],total_mentions)) 

actionGeo_CountryCode = 51 
time = 0 
actor1Type1Code = 12 
actor2Type1Code = 22 
numArticles = 33 
goldsteinScale = 30 
avgTone = 34 

if timeframe == "SQLDATE": 
time = 1 
elif timeframe == "MonthYear": 
time = 2 
else: 
time = 3 




rdd_reduce = rd.map(lambda x: x.split('\t')) \ 
     .map(lambda y: ((y[actionGeo_CountryCode], 
           y[time], 
           y[actor1Type1Code], 
           y[actor2Type1Code], 
           y[numArticles], 
           y[goldsteinScale], 
           y[avgTone]))) \ 
     .filter(check_valid) \ 
     .map(lambda c: ((c[0],int(c[1]),c[2],c[3],int(c[4]),int(float(c[5])),int(float(c[6]))))) \ 
     .map(fillin) \ 
       .filter(lambda r: r[0] in tofullname and r[2] in toevent and r[2] != "" and r[0] != "") \ 
       .map(lambda t: ((tofullname[t[0]],t[1],toevent[t[2]],t[3],t[4],t[5]))) \ 
       .map(lambda f: (((f[0],f[1],f[2]),(f[3],f[4],f[5],1)))) \ 
     .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])) \ 
     .map(lambda s: ((s[0],(s[1][0],s[1][1]/s[1][3],s[1][2]/s[1][3])))) 


rdd_format = rdd_reduce.map(lambda t:((t[0][0],t[0][1]), 
             ([(t[0][2],t[1][0])], 
             [(t[0][2],{"GoldsteinScaleAvg":t[1][1], 
               "ToneAvg":t[1][2]})]))) \ 
      .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \ 
      .map(lambda v: (v[0], 
             sorted(v[1][0],key=itemgetter(1),reverse=True), 
             v[1][1])) \ 
       .map(sum_allevents) \ 
         .map(lambda f: ((f[0],f[1][:5],dict(f[2]),f[3]))) \ 
         .map(popular_avg) \ 
       .map(event_todict) \ 
         .map(merge_info) \ 
      .map(lambda d: ((d[0][0],d[0][1],d[1]))) 

return rdd_format 




daily_rdd = rddCleaning(rdd,"SQLDATE") 
print(daily_rdd.take(6)); 
monthly_rdd = rddCleaning(rdd,"MonthYear") 
print(monthly_rdd.take(6)); 
yearly_rdd = rddCleaning(rdd,"Year") 
print(yearly_rdd.take(6)); 

は私pysparkランニングの写真です:提案の後に作られた enter image description here

編集: 私は自分のコードに次の変更を加え、それがパフォーマンスを改善したが、それはまだ取っています長い時間。これは起こっているのは、私がdfを呼び出すたびにS3バケットからすべてのファイルをもう一度読み込んでいるからです。 dfと一時テーブルのいくつかをキャッシュに入れるべきですか?私は考えることができる

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql.functions import udf 
from pyspark.sql.functions import col 
from pyspark.sql.types import StringType, DoubleType, IntegerType 
from abbreviations_dict import tofullname, toevent 
from operator import itemgetter 
import pyspark_cassandra 

sc = SparkContext() 
sqlContext = SQLContext(sc) 

customSchema = schema = StructType([ 
     StructField('GLOBALEVENTID',StringType(),True), 
     StructField('SQLDATE',StringType(),True), 
     StructField('MonthYear',StringType(),True), 
     StructField('Year',StringType(),True), 
     StructField('FractionDate',StringType(),True), 
     StructField('Actor1Code',StringType(),True), 
     StructField('Actor1Name',StringType(),True), 
     StructField('Actor1CountryCode',StringType(),True), 
     StructField('Actor1KnownGroupCode',StringType(),True), 
     StructField('Actor1EthnicCode',StringType(),True), 
     StructField('Actor1Religion1Code',StringType(),True), 
     StructField('Actor1Religion2Code',StringType(),True), 
     StructField('Actor1Type1Code',StringType(),True), 
     StructField('Actor1Type2Code',StringType(),True), 
     StructField('Actor1Type3Code',StringType(),True), 
     StructField('Actor2Code',StringType(),True), 
     StructField('Actor2Name',StringType(),True), 
     StructField('Actor2CountryCode',StringType(),True), 
     StructField('Actor2KnownGroupCode',StringType(),True), 
     StructField('Actor2EthnicCode',StringType(),True), 
     StructField('Actor2Religion1Code',StringType(),True), 
     StructField('Actor2Religion2Code',StringType(),True), 
     StructField('Actor2Type1Code',StringType(),True), 
     StructField('Actor2Type2Code',StringType(),True), 
     StructField('Actor2Type3Code',StringType(),True), 
     StructField('IsRootEvent',StringType(),True), 
     StructField('EventCode',StringType(),True), 
     StructField('EventBaseCode',StringType(),True), 
     StructField('EventRootCode',StringType(),True), 
     StructField('QuadClass',StringType(),True), 
     StructField('GoldsteinScale',StringType(),True), 
     StructField('NumMentions',StringType(),True), 
     StructField('NumSources',StringType(),True), 
     StructField('NumArticles',StringType(),True), 
     StructField('AvgTone',StringType(),True), 
     StructField('Actor1Geo_Type',StringType(),True), 
     StructField('Actor1Geo_FullName',StringType(),True), 
     StructField('Actor1Geo_CountryCode',StringType(),True), 
     StructField('Actor1Geo_ADM1Code',StringType(),True), 
     StructField('Actor1Geo_Lat',StringType(),True), 
     StructField('Actor1Geo_Long',StringType(),True), 
     StructField('Actor1Geo_FeatureID',StringType(),True), 
     StructField('Actor2Geo_Type',StringType(),True), 
     StructField('Actor2Geo_FullName',StringType(),True), 
     StructField('Actor2Geo_CountryCode',StringType(),True), 
     StructField('Actor2Geo_ADM1Code',StringType(),True), 
     StructField('Actor2Geo_Lat',StringType(),True), 
     StructField('Actor2Geo_Long',StringType(),True), 
     StructField('Actor2Geo_FeatureID',StringType(),True), 
     StructField('ActionGeo_Type',StringType(),True), 
     StructField('ActionGeo_FullName',StringType(),True), 
     StructField('ActionGeo_CountryCode',StringType(),True), 
     StructField('ActionGeo_ADM1Code',StringType(),True), 
     StructField('ActionGeo_Lat',StringType(),True), 
     StructField('ActionGeo_Long',StringType(),True), 
     StructField('ActionGeo_FeatureID',StringType(),True), 
     StructField('DATEADDED',StringType(),True), 
     StructField('SOURCEURL',StringType(),True)]) 

df = sqlContext.read \ 
    .format('com.databricks.spark.csv') \ 
    .options(header='false') \ 
    .options(delimiter="\t") \ 
    .load('s3a://gdelt-open-data/events/*', schema = customSchema) 

def modify_values(r,y): 
    if r == '' and y != '': 
    return y 
    else: 
     return r 

def country_exists(r): 
    if r in tofullname: 
     return tofullname[r] 
    else: 
    return '' 

def event_exists(r): 
    if r in toevent: 
    return toevent[r] 
    else: 
    return '' 


modify_val = udf(modify_values, StringType()) 
c_exists = udf(country_exists,StringType()) 
e_exists = udf(event_exists,StringType()) 
dfsub1 = df.withColumn("Actor1Type1Code",modify_val(col("Actor1Type1Code"),col("Actor2Type1Code"))) \ 
      .withColumn("ActionGeo_CountryCode",c_exists(col("ActionGeo_CountryCode"))) \ 
      .withColumn("Actor1Type1Code",e_exists(col("Actor1Type1Code"))) 

sqlContext.registerDataFrameAsTable(dfsub1, 'temp') 
df2 = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
           SQLDATE, MonthYear, Year, 
           Actor1Type1Code, 
           NumArticles, 
           GoldsteinScale, 
           AvgTone 
          FROM temp 
         WHERE ActionGeo_CountryCode <> '' 
          AND Actor1Type1Code <> '' 
          AND NumArticles <> '' 
          AND GoldsteinScale <> '' 
          AND AvgTone <> ''""") 

sqlContext.registerDataFrameAsTable(df2, 'temp2') 
df3 = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
           CAST(SQLDATE AS INTEGER), CAST(MonthYear AS INTEGER), CAST(Year AS INTEGER), 
           Actor1Type1Code, 
           CAST(NumArticles AS INTEGER), 
           CAST(GoldsteinScale AS INTEGER), 
           CAST(AvgTone AS INTEGER) 
          FROM temp2""") 

sqlContext.registerDataFrameAsTable(df3, 'temp3') 
sqlContext.cacheTable('temp3') 

dfdaily = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
            SQLDATE, 
            Actor1Type1Code, 
            SUM(NumArticles) AS NumArticles, 
            ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale, 
            ROUND(AVG(AvgTone),0) AS AvgTone 
           FROM temp3 
           GROUP BY ActionGeo_CountryCode, 
             SQLDATE, 
             Actor1Type1Code""") 

dfmonthly = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
            MonthYear, 
            Actor1Type1Code, 
            SUM(NumArticles) AS NumArticles, 
            ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale, 
            ROUND(AVG(AvgTone),0) as AvgTone 
           FROM temp3 
            GROUP BY ActionGeo_CountryCode, 
            MonthYear, 
            Actor1Type1Code""") 

dfyearly = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
            Year, 
            Actor1Type1Code, 
            SUM(NumArticles) AS NumArticles, 
            ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale, 
            ROUND(AVG(AvgTone),0) as AvgTone 
           FROM temp3 
           GROUP BY ActionGeo_CountryCode, 
             Year, 
             Actor1Type1Code""") 

def rddCleaning(rd,timeframe): 

    def popular_avg(curr_tup): 
     lst = curr_tup[1] 
     dictionary = curr_tup[2] 
     dict_matches = {} 
     for tup in lst: 
     event_type = tup[0] 
      dict_matches[event_type] = dictionary[event_type] 
     return ((curr_tup[0],lst,dict_matches,curr_tup[3])) 

    def merge_info(tup): 
     main_dict = tup[1] 
     info_dict = tup[2] 
     for key in info_dict: 
      main_dict[key].update(info_dict[key]) 
    main_dict["TotalArticles"] = {"total":tup[3]} 
     return ((tup[0],main_dict)) 

    def event_todict(tup): 
     lst = tup[1] 
     dict_matches = {} 
     for event_tup in lst: 
      dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]} 
     return ((tup[0],dict_matches,tup[2],tup[3])) 

    def sum_allevents(tup): 
     type_lst = tup[1] 
     total_mentions = 0 
     for event in type_lst: 
       total_mentions += event[1] 
     return ((tup[0],type_lst,tup[2],total_mentions)) 

    rdd_format = rd.map(lambda y: ((y["ActionGeo_CountryCode"],y[timeframe]), 
            ([(y["Actor1Type1Code"],y["NumArticles"])], 
        [(y["Actor1Type1Code"],{"Goldstein":y["GoldsteinScale"],"ToneAvg":y["AvgTone"]})] 
        ))) \ 
      .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \ 
       .map(lambda v: (v[0], 
            sorted(v[1][0],key=itemgetter(1),reverse=True), 
            dict(v[1][1]))) \ 
      .map(sum_allevents) \ 
        .map(popular_avg) \ 
      .map(event_todict) \ 
        .map(merge_info) \ 
       .map(lambda d: ((d[0][0],d[0][1],d[1]))) 

    return rdd_format 

print("THIS IS THE FIRST ONE ######################################################") 
daily_rdd = rddCleaning(dfdaily.rdd,"SQLDATE") 
print(daily_rdd.take(5)) 
print("THIS IS THE SECOND ONE ######################################################") 
monthly_rdd = rddCleaning(dfmonthly.rdd,"MonthYear") 
print(monthly_rdd.take(5)) 
print("THIS IS THE THIRD ONE ######################################################") 
yearly_rdd = rddCleaning(dfyearly.rdd,"Year") 
print(yearly_rdd.take(5)) 
+0

まず、すべてのudfsを緩めます。これらのどれも必要ありません。 – zero323

+0

ありがとうございました!ほんとうにありがとう。 私は、SQLでPython辞書を使う方法がわからないので、ufdsを使用しました。 2つのudfsでは、別のpythonファイルに格納されている辞書に基づいて行の名前を変更しています。たとえば、ある列のセルに「US」がある場合、その列をudfの「United States」に名前を変更します。 SQLで辞書変数を使用できますか? – CatherineAlv

+0

'join'を' broadcast'や[literal map](http://stackoverflow.com/a/32788650/1560062)と併用することができます。 – zero323

答えて

2

最も直接的なものではなく、RDDのデータフレームを使用することです: はここに私のコードです。 基本的に、PythonのRDDは、PythonとJVMの間の変換のために、scalaよりかなり遅いです。また、データフレームには多くの最適化機能があります。

変換を提案しようとすると、ここですべてのコードを実行することは非常に困難ですが、基礎として、csvから直接dataframeに読み込むためにspark.read.csvを使用できます(スキーマを設定して多くの検証が自動的に行われる)、多くの既存の関数は簡単に記述する必要があります。

+0

私がcsvをデータフレームとして読んだら、もはや.mapと.reduce関数は正しく動作しません。私のデータにいくつかの変換を加えるには、.mapと.reduceが必要です。私はマスターと3人の奴隷を使用しています。私はすべてのリソースを使いたいと思っています。 RDDを使用すると、タスクを他のノードと並列化します。 – CatherineAlv

+0

@CatherineAlvよくあることに、RDDとDataFramesの間にいくつかの詳細がある場合 – eliasah

+0

あなたのコードをすばやく見れば、.mapと.reduceで行うことのすべてをデータフレームで行うことができるように思えます(dataframes関数)。 Sparkのデータフレーム(pyspark.sqlのspark sqlパッケージの一部)は、RDDのように(さらに最適化されて)配布されます。欠落しているものがあれば、一般的にはUDFで解くことができます(ただし、Python UDFはスカラーよりも遅く、pyspark.sql.functionsで利用可能な関数よりもはるかに遅いです)。詳細については、spark sqlのプログラミングガイドを参照してください。 –

関連する問題