2017-09-01 5 views
0

私は顧客ごとにいくつかのプロセスに関する情報をホストする顧客テーブルを持っています。RDDを集約して組み合わせる正しい方法

目的は、顧客とプロセスごとに機能を抽出することです。つまり、すべてのフィーチャは、.groupby(customerID, processID)オブジェクトの集計または並べ替え比較の計算になります。

ただし、時間の経過とともにますます多くの機能を追加できるようにすることが目標です。したがって、基本的に、ユーザーはいくつかのフィルター、メトリック、および集計を使用して新しい関数を定義し、この新しい関数を表で操作する関数のプールに追加することができます。

出力はすべての機能を備えたcustomerID、processIDテーブルである必要があります。

だから私startet少し最小限の作業例:彼らはただホールド1要素のラウンドロビンデータベースを返す

def extr_ft_1 (proc_data, limit=100): 

    proc_data = proc_data.filter(proc_data.speed > limit).agg(count(proc_data.speed)) 

    proc_data = proc_data.select(col('count(speed)').alias('speed_feature')) 

    proc_data.show() 

    return proc_data 


def extr_ft_0 (proc_data): 

    max_t = proc_data.agg(spark_max(proc_data.timestamp)) 

    min_t = proc_data.agg(spark_min(proc_data.timestamp)) 

    max_t = max_t.select(col('max(timestamp)').alias('max')) 

    min_t = min_t.select(col('min(timestamp)').alias('min')) 

    X = max_t.crossJoin(min_t) 

    X = X.withColumn('time_feature', X.max+X.min) 

    X = X.drop(X.min).drop(X.max) 

    X.show() 

    return (X) 

l = [('CM1','aa1', 100,0.1),('CM1','aa1', 110,0.2),\ 
    ('CM1','aa1', 110,0.9),('CM1','aa1', 100,1.5),\ 
    ('CX2','bb9', 100,0.1),('CX2','bb9', 100,0.2),\ 
    ('CX2','bb9', 110,6.0),('CX2','bb9', 100,0.18)] 

rdd = sc.parallelize(l) 

df = sqlContext.createDataFrame(rdd,['customid','procid','speed','timestamp']) 

+--------+------+-----+---------+ 
|customid|procid|speed|timestamp| 
+--------+------+-----+---------+ 
|  CM1| aa1| 100|  0.1| 
|  CM1| aa1| 110|  0.2| 
|  CM1| aa1| 110|  0.9| 
|  CM1| aa1| 100|  1.5| 
|  CX2| bb9| 100|  0.1| 
|  CX2| bb9| 100|  0.2| 
|  CX2| bb9| 110|  6.0| 
|  CX2| bb9| 100|  0.18| 
+--------+------+-----+---------+ 

が、私は、これらの機能により抽出され得る2任意の機能を定義します集計値 次に、全ての特徴関数は、所与のプロセスに適用され、各プロセスの結果RDDに組み合わされる:

def get_proc_features(proc, data, *features): 

    proc_data = data.filter(data.customid == proc) 

    features_for_proc = [feature_value(proc_data) for feature_value in features] 



    for number, feature in enumerate(features_for_proc): 

     if number == 0: 

      l = [(proc,'dummy')] 

      rdd = sc.parallelize(l) 

      df = sqlContext.createDataFrame(rdd,['customid','dummy']) 

      df = df.drop(df.dummy) 

      df.show() 

      features_for_proc_rdd = feature 

      features_for_proc_rdd = features_for_proc_rdd.crossJoin(df) 

      continue 

     features_for_proc_rdd = features_for_proc_rdd.crossJoin(feature) 

     features_for_proc_rdd.show() 

    return features_for_proc_rdd 

彼らは最後のステップは、一のデータフレームに各プロセスのための機能を含むすべての行を追加することである。

for number, proc in enumerate(customer_list_1): 

    if number == 0: 

     #results = get_trip_features(trip, df, extr_ft_0, extr_ft_1) 
     results = get_proc_features(proc, df, *extr_feature_funcs) 

     continue 

    results = results.unionAll(get_proc_features(proc, df, *extr_feature_funcs)) 

results.show() 

変換のチェーンはこのように書き:

GET顧客1についての特徴1および2:

+------------+ 
|time_feature| 
+------------+ 
|   1.6| 
+------------+ 

+-------------+ 
|speed_feature| 
+-------------+ 
|   2| 
+-------------+ 

にそれらを組み合わせる:顧客2のために同じことを行い、最終的な結果RDDにすべてRDDSを追加

+------------+--------+-------------+ 
|time_feature|customid|speed_feature| 
+------------+--------+-------------+ 
|   1.6|  CM1|   2| 
+------------+--------+-------------+ 

+------------+--------+-------------+ 
|time_feature|customid|speed_feature| 
+------------+--------+-------------+ 
|   1.6|  CM1|   2| 
|   6.1|  CX2|   1| 
+------------+--------+-------------+ 

私は、クラスタ上でコードを実行した場合、それは2のために働きます顧客。 しかし、それを妥当な量の顧客でテストしたところ、ほとんどがGCとヒープメモリエラーが発生します。

ここでは多くのRDDと連携していますか?私のコードは非常に非効率的ですが、どこで最適化を開始するのか分かりません。私はちょうど最後にアクションを1つ呼び出すと思います(私はライブモードですべてのショーをドロップし、最後のRDDを収集します)。 本当にありがとうございます。

+0

クラスタにはどのような構成(エグゼキュータ、メモリなど)を使用しましたか?データのサイズは?メモリのオーバーヘッドを増やさずにメモリを増やしても有効ではないことに注意してください。 – MaFF

+0

データは約160 TBです.200人のエグゼキュータです。記憶について私は今ではありません。 – JohnnyS

+0

あなたのコードはリファクタリングを必要としますが、問題はRDDではなく、それをフィルタリングしてユニタリキーで作業し、次にクロスジョインするという事実です。値を反復すると、pysparkの分散された部分が失われます。それを行う最善の方法は、データフレームとウィンドウ関数を使用することです。別のテーブルの機能が必要ない場合は、常に1つの作業テーブルを保持する必要があることに注意してください。私はあなたを助けますが、まず 'customer_list_1'と' extr_feature_funcs'は何ですか? – MaFF

答えて

0

あなたのコードはリファクタリングが必要ですが、問題はRDDではなく、単一のキーで作業してからクロスジョインするというフィルタリングという事実です。値を反復すると、pysparkの分散された部分が失われます。別のテーブルの機能が必要ない場合は、常に1つの作業テーブルを保持する必要があることに注意してください。

これを行う最も良い方法は、データフレームとウィンドウ関数を使用することです。

まず者は、自分の関数を書き直してみましょう:

from pyspark.sql import Window 

w = Window.partitionBy("customid") 
df1 = extr_ft_1(df, w) 
df0 = extr_ft_0(df1, w) 
df0.show() 

    +--------+------+-----+---------+-------------+------------+ 
    |customid|procid|speed|timestamp|speed_feature|time_feature| 
    +--------+------+-----+---------+-------------+------------+ 
    |  CM1| aa1| 100|  0.1|   2|   1.6| 
    |  CM1| aa1| 110|  0.2|   2|   1.6| 
    |  CM1| aa1| 110|  0.9|   2|   1.6| 
    |  CM1| aa1| 100|  1.5|   2|   1.6| 
    |  CX2| bb9| 100|  0.1|   1|   6.1| 
    |  CX2| bb9| 100|  0.2|   1|   6.1| 
    |  CX2| bb9| 110|  6.0|   1|   6.1| 
    |  CX2| bb9| 100|  0.18|   1|   6.1| 
    +--------+------+-----+---------+-------------+------------+ 

ここでは、情報を失うことはありません(私たちはすべての行を保持する)ので、あなたがしたい場合:wは、ウィンドウの仕様である

import pyspark.sql.functions as psf 
def extr_ft_1 (proc_data, w, limit=100): 
    return proc_data.withColumn(
     "speed_feature", 
     psf.sum((proc_data.speed > limit).cast("int")).over(w) 
    ) 

def extr_ft_0(proc_data, w): 
    return proc_data.withColumn(
     "time_feature", 
     psf.min(proc_data.timestamp).over(w) + psf.max(proc_data.timestamp).over(w) 
    ) 

できるだけ多くの機能を追加してください。最終的な集計結果が必要な場合は、groupBy("customid")を実行してください。

たとえば、procidを含めるようにウィンドウスペックの集約キーを変更することもできます。

+0

ありがとう、マリー!私は今日それをテストし、それは魅力のように動作します:)メモリエラーはなく、スクリプトは予想以上に速く実行されます。 – JohnnyS

+0

それは問題ではない、私は助けることができてうれしいよ:) – MaFF

関連する問題