2017-07-27 3 views
0

In Pyspark - 表Aのcolumn(listed_1)の列値をの値でwhere condition (B.list_expire_value) > 5 || (B.list_date) < 6に設定する方法。 (B)は、それらが表Bの列であることを示すことです。pysparkの異なるテーブルから列の値を設定するには?

spark_df = table_1.join("table_2", on ="uuid").when((table_2['list_expire_value'] > 5) | (table_2['list_date'] < 6)).withColumn("listed_1", table_2['list_date']) 

をしかし、私はエラーを取得しています:

現在、私はやっています。これを行う方法?

 
Sample table : 

Table A 
uuid listed_1 
001 abc 
002 def 
003 ghi 

Table B 
uuid list_date list_expire_value  col4 
001  12   7      dckvfd 
002  14   3      dfdfgi 
003  3   8      sdfgds 

Expected Output 
uuid listed1  list_expire_value  col4 
001  12   7      dckvfd 
002  def   3      dfdfgi 
003  3   8      sdfgds 

002 of listed1 will not be replaced since they do not fufil the when conditions. 

+0

@mtotoは、出力が期待される追加された。忘れてはいけません – Viv

+0

@tbone、sqlContextを使用すると、Update文になり、col value = xに設定されます。それは火花の権利で許可されていません? – Viv

+0

いいえ、SQL結合の結果である新しいデータフレームを作成してください – tbone

答えて

1

正しいフォームは

from pyspark.sql import functions as F 
spark_df = table_1.join(table_2, 'uuid', 'inner').withColumn('list_expire_value',F.when((table_2.list_expire_value > 5) | (table_2.list_date < 6), table_1.listed_1).otherwise(table_2.list_date)).drop(table_1.listed_1) 
1

希望します。

from pyspark.sql.functions import udf 
from pyspark.sql.types import StringType 

A = sc.parallelize([('001','abc'),('002','def'),('003','ghi')]).toDF(['uuid','listed_1']) 
B = sc.parallelize([('001',12,7,'dckvfd'),('002',14,3,'dfdfgi'),('003',3,8,'sdfgds')]).\ 
    toDF(['uuid','list_date','list_expire_value','col4']) 

def cond_fn(x, y, z): 
    if (x > 5 or y < 6): 
     return y 
    else: 
     return z 

final_df = A.join(B, on="uuid") 
udf_val = udf(cond_fn, StringType()) 
final_df = final_df.withColumn("listed1",udf_val(final_df.list_expire_value,final_df.list_date, final_df.listed_1)) 
final_df.select(["uuid","listed1","list_expire_value","col4"]).show() 


それはあなたの問題を解決した場合、私たちが知っているように:) pysparkのSQLクエリの

関連する問題