を列を追加:は、私がpysparkでデータフレームを持ってpysparkにデータフレームと更新へ
ratings = spark.createDataFrame(
sc.textFile("transactions.json").map(lambda l: json.loads(l)),
)
ratings.show()
+--------+-------------------+------------+----------+-------------+-------+
|click_id| created_at| ip|product_id|product_price|user_id|
+--------+-------------------+------------+----------+-------------+-------+
| 123|2016-10-03 12:50:33| 10.10.10.10| 98373| 220.5| 1|
| 124|2017-02-03 11:51:33| 10.13.10.10| 97373| 320.5| 1|
| 125|2017-10-03 12:52:33| 192.168.2.1| 96373| 20.5| 1|
| 126|2017-10-03 13:50:33|172.16.11.10| 88373| 220.5| 2|
| 127|2017-10-03 13:51:33| 10.12.15.15| 87373| 320.5| 2|
| 128|2017-10-03 13:52:33|192.168.1.10| 86373| 20.5| 2|
| 129|2017-08-03 14:50:33| 10.13.10.10| 78373| 220.5| 3|
| 130|2017-10-03 14:51:33| 12.168.1.60| 77373| 320.5| 3|
| 131|2017-10-03 14:52:33| 10.10.30.30| 76373| 20.5| 3|
+--------+-------------------+------------+----------+-------------+-------+
ratings.registerTempTable("transactions")
final_df = sqlContext.sql("select * from transactions");
私はstatus
と呼ばれるこのデータフレームに新しい列を追加し、created_at
とuser_id
に基づいて、ステータス列を更新します。
created_at
とuser_id
は、指定されたテーブルtransations
から読み出されstatus
を返す関数get_status(user_id,created_at)
に渡されます。このstatus
は、対応するuser_id
およびcreated_at
の文字列の中に入れておく必要があります。
pysparkでalterおよびupdateコマンドを実行できますか? これはどのようにpysparkを使って行うことができますか?
'created_at'と' user_id'が与えられたテーブル 'transationsから読み込まれると言います'を返し、' status'を返す 'get_status(user_id、created_at)'関数に渡します。この 'status'は、対応する' user_id'と 'created_at'の新しい列としてトランザクションテーブルに入れなければなりません – Firstname