2016-08-03 33 views

答えて

4

次は、Python(Spark 1.6 +)でDataframe APIを実装した実装例です。

import pyspark.sql.functions as F 
import numpy as np 
from pyspark.sql.types import FloatType 

それではは "salaries" sparkのdataframeであるIn顧客のための毎月の持っていると仮定しましょうsalaries。

月を| customer_id |給与

、我々はすべての月

を通じて顧客ごとの給与の中央値を見つけたい

ステップ1:給与に集約:中央

def find_median(values_list): 
    try: 
     median = np.median(values_list) #get the median of values in a list in each row 
     return round(float(median),2) 
    except Exception: 
     return None #if there is anything wrong with the given values 

median_finder = F.udf(find_median,FloatType()) 

ステップ2を計算するために、ユーザー定義関数を書きますそれぞれの行の給与リストにそれらを集めることによって計算されます。

salaries_list = salaries.groupBy("customer_id").agg(F.collect_list("salary").alias("salaries")) 

ステップ3:給与計算で、median_finder udfを呼び出します。col新しい列として中央値を追加する

salaries_list = salaries_list.withColumn("median",median_finder("salaries")) 
+1

np.nanmedian(values_list)を使用するとNaNが無視され、時にはより良い選択です –

関連する問題