2016-10-24 6 views
0

PySparkを使ってRDDに関数を適用し、その結果を新しい列に入れる方法を探しています。データフレームと、それは簡単になります。 を考える:RDDで関数を使用して新しい列(Pyspark)を取得する方法は?

rdd = sc.parallelize([(u'1751940903', u'2014-06-19', '2016-10-19'), (u'_guid_VubEgxvPPSIb7W5caP-lXg==', u'2014-09-10', '2016-10-19')]) 

私のコードは次のようになります。そして、

df= rdd.toDF(['gigya', 'inscription','d_date']) 
df.show() 
+--------------------+-------------------------+----------+ 
|    gigya|    inscription| d_date| 
+--------------------+-------------------------+----------+ 
|   1751940903|    2014-06-19|2016-10-19| 
|_guid_VubEgxvPPSI...|    2014-09-10|2016-10-19| 
+--------------------+-------------------------+----------+ 

from pyspark.sql.functions import split, udf, col 
get_period_day = udf(lambda item : datetime.strptime(item, "%Y-%m-%d").timetuple().tm_yday) 

df.select('d_date', 'gigya', 'inscription', get_period_day(col('d_date')).alias('period_day')).show() 

+----------+--------------------+-------------------------+----------+ 
| d_date|    gigya|inscription_service_6Play|period_day| 
+----------+--------------------+-------------------------+----------+ 
|2016-10-19|   1751940903|    2014-06-19|  293| 
|2016-10-19|_guid_VubEgxvPPSI...|    2014-09-10|  293| 
+----------+--------------------+-------------------------+----------+ 

をせずに同じことを行う方法があります私のRDDをDataFrameに変換する必要はありますか?

rdd.map(lambda x: datetime.strptime(x[1], '%Y-%m-%d').timetuple().tm_yday).cache().collect() 

ヘルプ:exempleためのマップで何か..

このコードは、ちょうど私に期待される結果から一部を与えることができますか?

答えて

2

試してみてください。

rdd.map(lambda x: 
    x + (datetime.strptime(x[1], '%Y-%m-%d').timetuple().tm_yday,)) 

か:

def g(x): 
    return x + (datetime.strptime(x[1], '%Y-%m-%d').timetuple().tm_yday,) 

rdd.map(g) 
+0

LostInOverflow:ありがとうございます!あなたはロック! – DataAddicted

関連する問題