別のデータフレームでUDFを実行しているときに、どのようにpysparkデータフレームを参照していますか?Pyspark:別のデータフレーム上のUDF内のデータフレームを参照する方法は?
ここにはダミーの例があります。私はscores
とlastnames
の2つのデータフレームを作成しています。それぞれのデータフレーム内で同じデータフレームが2つあります。 scores
に適用されたUDFでは、lastnames
をフィルタリングし、lastname
にある文字列を返します。
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext("local")
sqlCtx = SQLContext(sc)
# Generate Random Data
import itertools
import random
student_ids = ['student1', 'student2', 'student3']
subjects = ['Math', 'Biology', 'Chemistry', 'Physics']
random.seed(1)
data = []
for (student_id, subject) in itertools.product(student_ids, subjects):
data.append((student_id, subject, random.randint(0, 100)))
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("student_id", StringType(), nullable=False),
StructField("subject", StringType(), nullable=False),
StructField("score", IntegerType(), nullable=False)
])
# Create DataFrame
rdd = sc.parallelize(data)
scores = sqlCtx.createDataFrame(rdd, schema)
# create another dataframe
last_name = ["Granger", "Weasley", "Potter"]
data2 = []
for i in range(len(student_ids)):
data2.append((student_ids[i], last_name[i]))
schema = StructType([
StructField("student_id", StringType(), nullable=False),
StructField("last_name", StringType(), nullable=False)
])
rdd = sc.parallelize(data2)
lastnames = sqlCtx.createDataFrame(rdd, schema)
scores.show()
lastnames.show()
from pyspark.sql.functions import udf
def getLastName(sid):
tmp_df = lastnames.filter(lastnames.student_id == sid)
return tmp_df.last_name
getLastName_udf = udf(getLastName, StringType())
scores.withColumn("last_name", getLastName_udf("student_id")).show(10)
そして次は、トレースの最後の部分である:代わりにrdd
を作成し、df
にそれを作るの名前
data2 = {}
for i in range(len(student_ids)):
data2[student_ids[i]] = last_name[i]
の容易な検索のための辞書にペアを変更
Py4JError: An error occurred while calling o114.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
UDFの中で 'df'にアクセスすることはできません。これはexecutorで処理され、' df' refはドライバのみからアクセスできるからです。 'lastnames'にブロードキャスト変数を使うことができます。助けが必要なら私に知らせてください。 – mrsrinivas
しかし、UDFから行うのではなく、 'lastnames'を' scores'と結びつけることを考えてください。 – mrsrinivas
こんにちは@mrsrinivas、返信いただきありがとうございます。私の実際の実装では、私はUDF内でより多くの処理を行う必要があるので、このダミーの例は、結合を使用して解決することができます最初に私は結合することはできません。第二に、はい!この場合、どのようにブロードキャスト変数を使用できますか? – tohweizhong