まず、2つのデータフレームに左外部結合を行い、次に右データフレームの列の1つにwhen
とotherwise
関数を使用します。ここで私が試した完全なソリューションがある -
from pyspark.sql import functions as F
from pyspark.sql.functions import col
# this is just data input
data1 = [[4,3,3],[2,4,3],[4,2,4],[4,3,3]]
data2 = [[4,3,3],[2,3,3],[4,1,4]]
# create dataframes
df1 = spark.createDataFrame(data1,schema=['userId','sku_id','type'])
df2 = spark.createDataFrame(data2,schema=['userId','sku_id','type'])
# condition for join
cond=[df1.userId==df2.userId,df1.sku_id==df2.sku_id,df1.type==df2.type]
# magic
df1.join(df2,cond,how='left_outer')\
.select(df1.userId,df1.sku_id,df1.type,df2.userId.alias('uid'))\
.withColumn('label',F.when(col('uid')>0 ,1).otherwise(0))\
.drop(col('uid'))\
.show()
出力:
+------+------+----+-----+
|userId|sku_id|type|label|
+------+------+----+-----+
| 2| 4| 3| 0|
| 4| 3| 3| 1|
| 4| 3| 3| 1|
| 4| 2| 4| 0|
+------+------+----+-----+