2016-12-13 25 views
1

は、私は、次のデータフレームがあると:PySparkデータフレームの操作効率

+----------+-----+----+-------+ 
|display_id|ad_id|prob|clicked| 
+----------+-----+----+-------+ 
|  123| 989| 0.9|  0| 
|  123| 990| 0.8|  1| 
|  123| 999| 0.7|  0| 
|  234| 789| 0.9|  0| 
|  234| 777| 0.7|  0| 
|  234| 769| 0.6|  1| 
|  234| 798| 0.5|  0| 
+----------+-----+----+-------+ 

I次いで(コードの下に示されている)最終的なデータセットを取得するために次の操作を実行し

# Add a new column with the clicked ad_id if clicked == 1, 0 otherwise 
df_adClicked = df.withColumn("ad_id_clicked", when(df.clicked==1, df.ad_id).otherwise(0)) 

# DF -> RDD with tuple : (display_id, (ad_id, prob), clicked) 
df_blah = df_adClicked.rdd.map(lambda x : (x[0],(x[1],x[2]),x[4])).toDF(["display_id", "ad_id","clicked_ad_id"]) 

# Group by display_id and create column with clicked ad_id and list of tuples : (ad_id, prob) 
df_blah2 = df_blah.groupby('display_id').agg(F.collect_list('ad_id'), F.max('clicked_ad_id')) 

# Define function to sort list of tuples by prob and create list of only ad_ids 
def sortByRank(ad_id_list): 
    sortedVersion = sorted(ad_id_list, key=itemgetter(1), reverse=True) 
    sortedIds = [i[0] for i in sortedVersion] 
    return(sortedIds) 

# Sort the (ad_id, prob) tuples by using udf/function and create new column ad_id_sorted 
sort_ad_id = udf(lambda x : sortByRank(x), ArrayType(IntegerType())) 
df_blah3 = df_blah2.withColumn('ad_id_sorted', sort_ad_id('collect_list(ad_id)')) 

# Function to change clickedAdId into an array of size 1 
def createClickedSet(clickedAdId): 
    setOfDocs = [clickedAdId] 
    return setOfDocs 

clicked_set = udf(lambda y : createClickedSet(y), ArrayType(IntegerType())) 
df_blah4 = df_blah3.withColumn('ad_id_set', clicked_set('max(clicked_ad_id)')) 

# Select the necessary columns 
finalDF = df_blah4.select('display_id', 'ad_id_sorted','ad_id_set') 

+----------+--------------------+---------+ 
|display_id|ad_id_sorted  |ad_id_set| 
+----------+--------------------+---------+ 
|234  |[789, 777, 769, 798]|[769] | 
|123  |[989, 990, 999]  |[990] | 
+----------+--------------------+---------+ 

がありますこれを行うより効率的な方法は?私が自分のコードでボトルネックに見えるように、この一連の変換を実行します。フィードバックをいただければ幸いです。

答えて

1

私はタイミングの比較を行っていませんが、UDFを使用しないことでSparkが最適に最適化できるはずだと思います。

#scala: val dfad = sc.parallelize(Seq((123,989,0.9,0),(123,990,0.8,1),(123,999,0.7,0),(234,789,0.9,0),(234,777,0.7,0),(234,769,0.6,1),(234,798,0.5,0))).toDF("display_id","ad_id","prob","clicked") 
#^^^that's^^^ the only difference (besides putting val in front of variables) between this python response and a Scala one 

dfad = sc.parallelize(((123,989,0.9,0),(123,990,0.8,1),(123,999,0.7,0),(234,789,0.9,0),(234,777,0.7,0),(234,769,0.6,1),(234,798,0.5,0))).toDF(["display_id","ad_id","prob","clicked"]) 
dfad.registerTempTable("df_ad") 



df1 = sqlContext.sql("SELECT display_id,collect_list(ad_id) ad_id_sorted FROM (SELECT * FROM df_ad SORT BY display_id,prob DESC) x GROUP BY display_id") 
+----------+--------------------+ 
|display_id|  ad_id_sorted| 
+----------+--------------------+ 
|  234|[789, 777, 769, 798]| 
|  123|  [989, 990, 999]| 
+----------+--------------------+ 

df2 = sqlContext.sql("SELECT display_id, max(ad_id) as ad_id_set from df_ad where clicked=1 group by display_id") 
+----------+---------+ 
|display_id|ad_id_set| 
+----------+---------+ 
|  234|  769| 
|  123|  990| 
+----------+---------+ 


final_df = df1.join(df2,"display_id") 
+----------+--------------------+---------+ 
|display_id|  ad_id_sorted|ad_id_set| 
+----------+--------------------+---------+ 
|  234|[789, 777, 769, 798]|  769| 
|  123|  [989, 990, 999]|  990| 
+----------+--------------------+---------+ 

maxを計算していたため、ad_id_setを1つの値に戻す必要がありませんでした。あなたが本当に配列でそれを必要とすれば、それを実現できると確信しています。

Scalaを使用する将来の人に同様の問題がある場合、微妙なScalaの違いが含まれています。

+0

このソリューションを提供いただき、ありがとうございます。私は両方のソリューションをタイムリーにしました。ソリューションは1.38ミリ秒で実行され、元のソリューションは2.01ミリ秒で実行されました。 :) – user2253546

関連する問題