2

別のデータフレームでUDFを実行しているときに、どのようにpysparkデータフレームを参照していますか?Pyspark:別のデータフレーム上のUDF内のデータフレームを参照する方法は?

ここにはダミーの例があります。私はscoreslastnamesの2つのデータフレームを作成しています。それぞれのデータフレーム内で同じデータフレームが2つあります。 scoresに適用されたUDFでは、lastnamesをフィルタリングし、lastnameにある文字列を返します。

from pyspark import SparkContext 
from pyspark import SparkConf 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 

sc = SparkContext("local") 
sqlCtx = SQLContext(sc) 


# Generate Random Data 
import itertools 
import random 
student_ids = ['student1', 'student2', 'student3'] 
subjects = ['Math', 'Biology', 'Chemistry', 'Physics'] 
random.seed(1) 
data = [] 

for (student_id, subject) in itertools.product(student_ids, subjects): 
    data.append((student_id, subject, random.randint(0, 100))) 

from pyspark.sql.types import StructType, StructField, IntegerType, StringType 
schema = StructType([ 
      StructField("student_id", StringType(), nullable=False), 
      StructField("subject", StringType(), nullable=False), 
      StructField("score", IntegerType(), nullable=False) 
    ]) 

# Create DataFrame 
rdd = sc.parallelize(data) 
scores = sqlCtx.createDataFrame(rdd, schema) 

# create another dataframe 
last_name = ["Granger", "Weasley", "Potter"] 
data2 = [] 
for i in range(len(student_ids)): 
    data2.append((student_ids[i], last_name[i])) 

schema = StructType([ 
      StructField("student_id", StringType(), nullable=False), 
      StructField("last_name", StringType(), nullable=False) 
    ]) 

rdd = sc.parallelize(data2) 
lastnames = sqlCtx.createDataFrame(rdd, schema) 


scores.show() 
lastnames.show() 


from pyspark.sql.functions import udf 
def getLastName(sid): 
    tmp_df = lastnames.filter(lastnames.student_id == sid) 
    return tmp_df.last_name 

getLastName_udf = udf(getLastName, StringType()) 
scores.withColumn("last_name", getLastName_udf("student_id")).show(10) 

そして次は、トレースの最後の部分である:代わりにrddを作成し、dfにそれを作るの名前

data2 = {} 
for i in range(len(student_ids)): 
    data2[student_ids[i]] = last_name[i] 

の容易な検索のための辞書にペアを変更

Py4JError: An error occurred while calling o114.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344) 
    at py4j.Gateway.invoke(Gateway.java:252) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
+0

UDFの中で 'df'にアクセスすることはできません。これはexecutorで処理され、' df' refはドライバのみからアクセスできるからです。 'lastnames'にブロードキャスト変数を使うことができます。助けが必要なら私に知らせてください。 – mrsrinivas

+0

しかし、UDFから行うのではなく、 'lastnames'を' scores'と結びつけることを考えてください。 – mrsrinivas

+0

こんにちは@mrsrinivas、返信いただきありがとうございます。私の実際の実装では、私はUDF内でより多くの処理を行う必要があるので、このダミーの例は、結合を使用して解決することができます最初に私は結合することはできません。第二に、はい!この場合、どのようにブロードキャスト変数を使用できますか? – tohweizhong

答えて

2

ブロードキャスト変数を作成する

//rdd = sc.parallelize(data2) 
//lastnames = sqlCtx.createDataFrame(rdd, schema) 
lastnames = sc.broadcast(data2) 

ブロードキャスト変数(lastnames)のvalues attrでudfにアクセスしてください。

from pyspark.sql.functions import udf 
def getLastName(sid): 
    return lastnames.value[sid] 
+1

**ブロードキャスト変数**で実装を変更しました。できる限り多くの純粋な関数としてUDFを作成しようとすると、あまりにも多くの外部依存関係がパフォーマンスを低下させる可能性があります。 – mrsrinivas

+0

私は 'lastnames.value'を見ると' '(student1 '、' Granger ')、(student2'、 'Weasley')、(student3 '、' Potter ')というコードスニペットを試しました。 ] 'これは' lastnames.value.filter'がもう正しく動作しないことを意味しますか? – tohweizhong

+0

そうですね。 udfで 'lastnames.value [" sid "]'を実行し、 'sid'をキーとし、値を' lastname'として辞書(変数 'data2')を作成してください。 – mrsrinivas

2

データフレーム(またはRDD)をUDF内部から直接参照することはできません。 DataFrameオブジェクトは、クラスタ上で発生するデータとアクションを表すために使用される、ドライバ上のハンドルです。あなたのUDF内のコードは、Sparkの選択時にクラスタ上で使い果たされます。 Sparkは、このコードをシリアライズし、クロージャーに含まれる変数のコピーを作成して各ワーカーに送信することでこれを行います。

代わりに、Sparkが提供する2つのDataFramesの結合/結合APIを使用します。データセットの1つが小さい場合は、手動でブロードキャスト変数内のデータを送信し、UDFからアクセスできます。それ以外の場合は、2つのデータフレームを作成してから結合操作を使用して組み合わせることができます。このような何か動作するはずです:

joined = scores.withColumnRenamed("student_id", "join_id") 
joined = joined.join(lastnames, joined.join_id == lastnames.student_id)\ 
       .drop("join_id") 
joined.show() 

+---------+-----+----------+---------+ 
| subject|score|student_id|last_name| 
+---------+-----+----------+---------+ 
|  Math| 13| student1| Granger| 
| Biology| 85| student1| Granger| 
|Chemistry| 77| student1| Granger| 
| Physics| 25| student1| Granger| 
|  Math| 50| student2| Weasley| 
| Biology| 45| student2| Weasley| 
|Chemistry| 65| student2| Weasley| 
| Physics| 79| student2| Weasley| 
|  Math| 9| student3| Potter| 
| Biology| 2| student3| Potter| 
|Chemistry| 84| student3| Potter| 
| Physics| 43| student3| Potter| 
+---------+-----+----------+---------+ 

それはフードスパークデータフレームの下にそれがある場合は加入の一部であり、データフレームは、シャッフルを避けるために、放送変数に変換することができ、最適化を持っていること、また、注目に値します十分に小さい。したがって、上記の結合方法を実行すると、より大きなデータセットを処理する能力を犠牲にすることなく、可能な限り最高のパフォーマンスを得ることができます。

関連する問題