に大きなスパークデータフレーム内の行の各サブセットにマップの操作を行うと、私がやりたいことは以下のとおりです。方法:PySpark私はPySparkを使用しています
は大きなスパークデータフレームDFは、すべてのレコードが含まれて。私はこのdfの 'id'カラムで分割されたレコードの各サブセットで並列計算を行いたいと思っています。私は現在の考えることができる方法は次のとおりです。(私は説明するために単純な例を使用します)
dicts = [
{'id': 1, 'name': 'a', 'score': 100},
{'id': 1, 'name': 'b', 'score': 150},
{'id': 2, 'name': 'c', 'score': 200},
{'id': 2, 'name': 'd', 'score': 300},
]
df = spark.createDataFrame(dicts)
from pyspark.sql.functions import (
collect_list,
struct
)
# df_agg will have the following schema: id, a list of structs
df_agg = df.groupBy('id').agg(
collect_list(struct(df.columns)).alias('records')
)
しかし、私は「my_func」は、いくつかの機能である
df_agg.rdd.map(my_func)
をしよう主にSparkのデータフレーム計算を行い、いくつかの問題に直面し、どのように処理するか分からない。 my_funcは行上で動作し、行['records']は構造体のリストを保存します。この構造体のリストをSpark DataFrameに変換するにはどうしたらいいですか?
toDF()は機能しません。 spark.createDataFrame(list、schema)を試しました。元のDFが使用するようにスキーマに入力しても、それは動作しません。
私はこれらのPySpark操作に比較的新しいので、このケースを処理する正しい方法が何であるか教えていただけたら助けてください。
ありがとうございます!
「my_func」とは何ですか?そして、エラーは何ですか? – Psidom
上記の「レコード」の「list_of_struct」をSparkデータフレームに変換し、このdfで操作を続行する機能。単純な例は 'rdd_new = df_agg.rdd.map(lambda r:spark.createDataFrame(r.records))'と書くだけで、rdd_new.collect()を実行すると、次のエラーが発生します: – ColdKing
Py4JError :o25 .__ getnewargs__を呼び出す際にエラーが発生しました。トレース: py4j.Py4JException:メソッド__getnewargsは__([])py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)で \t存在しないpy4j.reflection.ReflectionEngine.getMethodで \t(ReflectionEngine.java:326) py4j.Gateway.invoke(Gateway.java:272)py4j.commands.AbstractCommand.invokeMethodで \t(AbstractCommand.java:132)py4j.commands.CallCommand.executeで \t(CallCommand.java:79) で\t \t py4j.GatewayConnection.run(GatewayConnection.java:214) \t(java.lang.Thread.run)(Thread.java:745) – ColdKing