私は顧客ごとにいくつかのプロセスに関する情報をホストする顧客テーブルを持っています。RDDを集約して組み合わせる正しい方法
目的は、顧客とプロセスごとに機能を抽出することです。つまり、すべてのフィーチャは、.groupby(customerID, processID)
オブジェクトの集計または並べ替え比較の計算になります。
ただし、時間の経過とともにますます多くの機能を追加できるようにすることが目標です。したがって、基本的に、ユーザーはいくつかのフィルター、メトリック、および集計を使用して新しい関数を定義し、この新しい関数を表で操作する関数のプールに追加することができます。
出力はすべての機能を備えたcustomerID、processIDテーブルである必要があります。
だから私startet少し最小限の作業例:彼らはただホールド1要素のラウンドロビンデータベースを返す
def extr_ft_1 (proc_data, limit=100):
proc_data = proc_data.filter(proc_data.speed > limit).agg(count(proc_data.speed))
proc_data = proc_data.select(col('count(speed)').alias('speed_feature'))
proc_data.show()
return proc_data
def extr_ft_0 (proc_data):
max_t = proc_data.agg(spark_max(proc_data.timestamp))
min_t = proc_data.agg(spark_min(proc_data.timestamp))
max_t = max_t.select(col('max(timestamp)').alias('max'))
min_t = min_t.select(col('min(timestamp)').alias('min'))
X = max_t.crossJoin(min_t)
X = X.withColumn('time_feature', X.max+X.min)
X = X.drop(X.min).drop(X.max)
X.show()
return (X)
:
l = [('CM1','aa1', 100,0.1),('CM1','aa1', 110,0.2),\
('CM1','aa1', 110,0.9),('CM1','aa1', 100,1.5),\
('CX2','bb9', 100,0.1),('CX2','bb9', 100,0.2),\
('CX2','bb9', 110,6.0),('CX2','bb9', 100,0.18)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','procid','speed','timestamp'])
+--------+------+-----+---------+
|customid|procid|speed|timestamp|
+--------+------+-----+---------+
| CM1| aa1| 100| 0.1|
| CM1| aa1| 110| 0.2|
| CM1| aa1| 110| 0.9|
| CM1| aa1| 100| 1.5|
| CX2| bb9| 100| 0.1|
| CX2| bb9| 100| 0.2|
| CX2| bb9| 110| 6.0|
| CX2| bb9| 100| 0.18|
+--------+------+-----+---------+
が、私は、これらの機能により抽出され得る2任意の機能を定義します集計値 次に、全ての特徴関数は、所与のプロセスに適用され、各プロセスの結果RDDに組み合わされる:
def get_proc_features(proc, data, *features):
proc_data = data.filter(data.customid == proc)
features_for_proc = [feature_value(proc_data) for feature_value in features]
for number, feature in enumerate(features_for_proc):
if number == 0:
l = [(proc,'dummy')]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','dummy'])
df = df.drop(df.dummy)
df.show()
features_for_proc_rdd = feature
features_for_proc_rdd = features_for_proc_rdd.crossJoin(df)
continue
features_for_proc_rdd = features_for_proc_rdd.crossJoin(feature)
features_for_proc_rdd.show()
return features_for_proc_rdd
彼らは最後のステップは、一のデータフレームに各プロセスのための機能を含むすべての行を追加することである。
for number, proc in enumerate(customer_list_1):
if number == 0:
#results = get_trip_features(trip, df, extr_ft_0, extr_ft_1)
results = get_proc_features(proc, df, *extr_feature_funcs)
continue
results = results.unionAll(get_proc_features(proc, df, *extr_feature_funcs))
results.show()
変換のチェーンはこのように書き:
GET顧客1についての特徴1および2:
+------------+
|time_feature|
+------------+
| 1.6|
+------------+
+-------------+
|speed_feature|
+-------------+
| 2|
+-------------+
にそれらを組み合わせる:顧客2のために同じことを行い、最終的な結果RDDにすべてRDDSを追加
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
+------------+--------+-------------+
:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
| 6.1| CX2| 1|
+------------+--------+-------------+
私は、クラスタ上でコードを実行した場合、それは2のために働きます顧客。 しかし、それを妥当な量の顧客でテストしたところ、ほとんどがGCとヒープメモリエラーが発生します。
ここでは多くのRDDと連携していますか?私のコードは非常に非効率的ですが、どこで最適化を開始するのか分かりません。私はちょうど最後にアクションを1つ呼び出すと思います(私はライブモードですべてのショーをドロップし、最後のRDDを収集します)。 本当にありがとうございます。
クラスタにはどのような構成(エグゼキュータ、メモリなど)を使用しましたか?データのサイズは?メモリのオーバーヘッドを増やさずにメモリを増やしても有効ではないことに注意してください。 – MaFF
データは約160 TBです.200人のエグゼキュータです。記憶について私は今ではありません。 – JohnnyS
あなたのコードはリファクタリングを必要としますが、問題はRDDではなく、それをフィルタリングしてユニタリキーで作業し、次にクロスジョインするという事実です。値を反復すると、pysparkの分散された部分が失われます。それを行う最善の方法は、データフレームとウィンドウ関数を使用することです。別のテーブルの機能が必要ない場合は、常に1つの作業テーブルを保持する必要があることに注意してください。私はあなたを助けますが、まず 'customer_list_1'と' extr_feature_funcs'は何ですか? – MaFF