2017-09-17 4 views
0

をレンジ機能を使用するには、入力データを区切り:私はテキストファイルを作ってきたApacheのスパーク - Pythonの - 私は宇宙の一部のラインを持っているどのようにPyspark

Naresh HDFC 2017 01 
Naresh HDFC 2017 02 
Naresh HDFC 2017 03 
Anoop ICICI 2017 05 
Anoop ICICI 2017 06 
Anoop ICICI 2017 07 

Name Company Start_Date End_Date 
Naresh HDFC 2017-01-01 2017-03-31 
Anoop ICICI 2017-05-01 2017-07-30 

私はのような出力を必要としますこのデータをHadoopクラスタに配置し、コードを書いていますが、出力を取得する際に問題があります。親切に助けてください。 は私がエントリから月を抽出し、レンジ機能でそれらを置く方法を取得しておりませんので、私はハードレンジ機能で3
コードの値がコード化されています:あなたがようpysparkのto_date機能を使用することができます

from pyspark import SparkConf,SparkContext 
from pyspark.sql import SQLContext,Row 
from pyspark.sql.types import * 
import datetime 
sc = SparkContext() 
sqlcon = SQLContext(sc) 

month_map={'01':1,'02':2,'03':3,'04':4,'05':5,'06':6,'07':7,'08':8,'09':9, 
'10':10,'11':11,'12':12} 

def get_month(str): 
    return datetime.date(int(str[:4]),month_map[str[5:7]],int(str[8:10])) 

def parse_line(str): 
    match = str.split() 
    return (Row(name = match[0],type = match[1],start_date = 
    get_month(match[2]),end_date = get_month(match[3]))) 


#-----------------create RDD--------------- 

filepath = '/user/vikasmittal/Innovacer_data.txt' 
rdd1 = sc.textFile(filepath) 
rdd2 =rdd1.map(parse_line) 
for i in range(3): 
    rdd3 = rdd2.map(lambda l:(l.name,l.type,l.start_date.year,i)) 
    print(rdd3.collect()) 

答えて

0

を説明したhere

だけインポートpyspark.sql.functions *

>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t']) 
>>> df.select(to_date(df.t).alias('date')).collect() 
[Row(date=datetime.date(1997, 2, 28))] 

次のようにして、月を抽出することができます。あなたのデータをロードするデータフレームに変換した後

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) 
>>> df.select(month('a').alias('month')).collect() 
[Row(month=4)] 
2

をし、日付としてStart_DateEnd_Dateをキャストto_dateまたはcast("date")

import pyspark.sql.functions as psf 
df = sqlcon\ 
    .createDataFrame(rdd2, ['Name', 'Company', 'Start_Date', 'End_Date'])\ 
    .withColumn("Start_Date", psf.to_date("Start_Date"))\ 
    .withColumn("End_Date", psf.to_date("End_Date")) 
df.show() 
    +------+-------+----------+----------+ 
    | Name|Company|Start_Date| End_Date| 
    +------+-------+----------+----------+ 
    |Naresh| HDFC|2017-01-01|2017-03-31| 
    | Anoop| ICICI|2017-05-01|2017-07-30| 
    +------+-------+----------+----------+ 

我々はStart_DateEnd_Date間の日付の範囲を計算するためにUDFを適用します:

from dateutil.relativedelta import relativedelta 
def month_range(d1, d2): 
    return [d1 + relativedelta(months=+x) for x in range((d2.year - d1.year)*12 + d2.month - d1.month + 1)] 

import pyspark.sql.functions as psf 
from pyspark.sql.types import * 
month_range_udf = psf.udf(month_range, ArrayType(DateType())) 

我々は今だけの行ごとに1つの日付を取得するにはStart_DateEnd_Dateexplode配列にそれを適用することができます

df = df.withColumn("Date", psf.explode(month_range_udf("Start_Date", "End_Date"))) 
df.show() 

    +------+-------+----------+----------+----------+ 
    | Name|Company|Start_Date| End_Date|  Date| 
    +------+-------+----------+----------+----------+ 
    |Naresh| HDFC|2017-01-01|2017-03-31|2017-01-01| 
    |Naresh| HDFC|2017-01-01|2017-03-31|2017-02-01| 
    |Naresh| HDFC|2017-01-01|2017-03-31|2017-03-01| 
    | Anoop| ICICI|2017-05-01|2017-07-30|2017-05-01| 
    | Anoop| ICICI|2017-05-01|2017-07-30|2017-06-01| 
    | Anoop| ICICI|2017-05-01|2017-07-30|2017-07-01| 
    +------+-------+----------+----------+----------+ 

現在Date列からyearmonthを抽出することができます。

res = df.select(
    "Name", 
    "Company", 
    psf.year("Date").alias("year"), 
    psf.month("Date").alias("month") 
) 
res.show() 

    +------+-------+----+-----+ 
    | Name|Company|year|month| 
    +------+-------+----+-----+ 
    |Naresh| HDFC|2017| 1| 
    |Naresh| HDFC|2017| 2| 
    |Naresh| HDFC|2017| 3| 
    | Anoop| ICICI|2017| 5| 
    | Anoop| ICICI|2017| 6| 
    | Anoop| ICICI|2017| 7| 
    +------+-------+----+-----+ 
関連する問題