行ごとの操作を行う関数を使用して集計するPyspark DataFrameがあります。PySpark DataFrameの行ごとの集約
私は4列を有し、列AI内の各一意の値の列B、C、D
に行ごと凝集を行う必要があり、私はこの方法を使用しています:
- を
は使用して一意の値を取得
A_uniques = df.select('A').distinct()
def func(x): y = df.filter(df.A==x) y = np.array(y.toPandas()) for i in y.shape[0]: y[i,1] = y[i-1,0] y[i,0] = (y[i,0]+y[i,2])/y[i,3] agg = sum(y[:,1]) return agg
A_uniques.rdd.map(lambda x: (x['A'], func(x['A'])))
私はこのエラーを取得しています:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o64.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)
はRDDSでnumpyの配列を保存する解決策はありますか?または、この操作全体を他の方法で行うことはできますか?
サンプル入力と出力を投稿できるので、いくつかのアプローチを試すことができます。 –
あなたが 'groupby( 'col')を探していると思うでしょう。agg(sum(col2))' –
あなたが持っている問題はあなたがrdd変換から参照していることです。あなたの集計が組み込みのpyspark関数を使用する場合、DataFrame 'groupby(...).gg(...)'を使用することができます。もしそうでなければ、rdd 'groupby'と別注集計を使う必要があります。 – ags29