2017-10-05 3 views
-1

私は3 DATAFRAME持っている:スパークデータフレームの更新値

1. Item dataframe: 

+-------+---------+ 
|id_item|item_code| 
+-------+---------+ 
| 991| A0049| 
| 992| C1248| 
| 993| C0860| 
| 994| C0757| 
| 995| C0682| 
+-------+---------+ 

2. User dataframe: 

+------+--------+ 
|id_usn|  usn| 
+------+--------+ 
|417567|39063291| 
|417568|39063294| 
|417569|39063334| 
|417570|39063353| 
|417571|39063376| 
+------+--------+ 

3. Summary dataframe 

+-------+--------------------+ 
|id_item|  summary  | 
+-------+--------------------+ 
| 991|[[417567,0.579901...| 
| 992|[[417567,0.001029...| 
| 443|[[417585,0.219624...| 
+-------+--------------------+ 

and schema of this dataFrame: 

root 
|-- id_item: integer (nullable = true) 
|-- summary: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- id_usn: long (nullable = true) 
| | |-- rating: double (nullable = true) 

とStructTypeで今、id_usnを、私はid_usnを交換したいですサマリーデータフレームby ユーザデータフレーム内のusn

私はSparkを使用しています!

私はこの問題を解決してください!

+1

** 'join'の後に' select'を使ってみましたか?私は、質問があまりにも単純すぎて有効であるとは思っていません。 –

+0

私はそれを試すことができますが、私の問題は構造体であるため、id_usnを置き換えます。 –

+0

@JacekLaskowski:この問題の原因はありますか? –

答えて

1

希望します。

from pyspark.sql import functions as F 

sdf1 = summarydf.select('id_item','summary',F.explode('summary').alias('col_summary')).select('*',F.col('col_summary').id_usn.alias('id_usn'),F.col('col_summary').rating.alias('rating')).drop('col_summary') 
df = sdf1.join(itemdf,'id_item').join(userdf,'id_usn').select('item_code',F.struct('usn','rating').alias('tmpcol')).groupby('item_code').agg(F.collect_list('tmpcol').alias('summary')) 
+---------+--------------------+ 
|item_code|    summary| 
+---------+--------------------+ 
| C1248|[[39063291,0.0010...| 
| A0049|[[39063291,0.5799...| 
+---------+--------------------+ 
関連する問題