2017-10-16 7 views
0

を列を追加:は、私がpysparkでデータフレームを持ってpysparkにデータフレームと更新へ

ratings = spark.createDataFrame(
    sc.textFile("transactions.json").map(lambda l: json.loads(l)), 
) 
ratings.show() 

+--------+-------------------+------------+----------+-------------+-------+ 
|click_id|   created_at|   ip|product_id|product_price|user_id| 
+--------+-------------------+------------+----------+-------------+-------+ 
|  123|2016-10-03 12:50:33| 10.10.10.10|  98373|  220.5|  1| 
|  124|2017-02-03 11:51:33| 10.13.10.10|  97373|  320.5|  1| 
|  125|2017-10-03 12:52:33| 192.168.2.1|  96373|   20.5|  1| 
|  126|2017-10-03 13:50:33|172.16.11.10|  88373|  220.5|  2| 
|  127|2017-10-03 13:51:33| 10.12.15.15|  87373|  320.5|  2| 
|  128|2017-10-03 13:52:33|192.168.1.10|  86373|   20.5|  2| 
|  129|2017-08-03 14:50:33| 10.13.10.10|  78373|  220.5|  3| 
|  130|2017-10-03 14:51:33| 12.168.1.60|  77373|  320.5|  3| 
|  131|2017-10-03 14:52:33| 10.10.30.30|  76373|   20.5|  3| 
+--------+-------------------+------------+----------+-------------+-------+ 

ratings.registerTempTable("transactions") 
final_df = sqlContext.sql("select * from transactions"); 

私はstatusと呼ばれるこのデータフレームに新しい列を追加し、created_atuser_idに基づいて、ステータス列を更新します。

created_at

user_idは、指定されたテーブルtransationsから読み出されstatusを返す関数get_status(user_id,created_at)に渡されます。このstatusは、対応するuser_idおよびcreated_atの文字列の中に入れておく必要があります。

pysparkでalterおよびupdateコマンドを実行できますか? これはどのようにpysparkを使って行うことができますか?

答えて

0

正確に何をしたいのかは不明です。 window functionsをチェックすると、フレーム内の行を合計、比較することができます。

例えば

import pyspark.sql.functions as psf 
from pyspark.sql import Window 
w = Window.partitionBy("user_id").orderBy(psf.desc("created_at")) 
ratings.withColumn(
    "status", 
    psf.when(psf.row_number().over(w) == 1, "active").otherwise("inactive")).sort("click_id").show() 

+--------+-------------------+------------+----------+-------------+-------+--------+ 
|click_id|   created_at|   ip|product_id|product_price|user_id| status| 
+--------+-------------------+------------+----------+-------------+-------+--------+ 
|  123|2016-10-03 12:50:33| 10.10.10.10|  98373|  220.5|  1|inactive| 
|  124|2017-02-03 11:51:33| 10.13.10.10|  97373|  320.5|  1|inactive| 
|  125|2017-10-03 12:52:33| 192.168.2.1|  96373|   20.5|  1| active| 
|  126|2017-10-03 13:50:33|172.16.11.10|  88373|  220.5|  2|inactive| 
|  127|2017-10-03 13:51:33| 10.12.15.15|  87373|  320.5|  2|inactive| 
|  128|2017-10-03 13:52:33|192.168.1.10|  86373|   20.5|  2| active| 
|  129|2017-08-03 14:50:33| 10.13.10.10|  78373|  220.5|  3|inactive| 
|  130|2017-10-03 14:51:33| 12.168.1.60|  77373|  320.5|  3|inactive| 
|  131|2017-10-03 14:52:33| 10.10.30.30|  76373|   20.5|  3| active| 
+--------+-------------------+------------+----------+-------------+-------+--------+ 

次の2個の既存のものから新しい列を作成するUDFを渡したい場合はそれはあなたに、各ユーザーのラストクリック

を与えます。 は、あなたが引数

from pyspark.sql.types import * 
def get_status(user_id,created_at): 
    ... 

get_status_udf = psf.udf(get_status, StringType()) 

StringType()か、どちらがあなたの関数の出力をデータ型としてuser_idcreated_atを取る機能を持っている

ratings.withColumn("status", get_status_udf("user_id", "created_at")) 
+0

'created_at'と' user_id'が与えられたテーブル 'transationsから読み込まれると言います'を返し、' status'を返す 'get_status(user_id、created_at)'関数に渡します。この 'status'は、対応する' user_id'と 'created_at'の新しい列としてトランザクションテーブルに入れなければなりません – Firstname

関連する問題