2017-09-22 9 views
0

私はそうのような構造のファイルの非常に迷惑なセットを持っている:スパーク/ハイブ - 「ピボットテーブル」形式にグループのデータ

userId string, 
eventType string, 
source string, 
errorCode string, 
startDate timestamp, 
endDate timestamp 

各ファイルは様々で、イベントIDあたりのレコードの任意の数を含むことができeventTypesとsources、およびそれぞれ異なるコードと開始日と終了日を指定します。

これらのすべてをuserIdにグループ化する方法はありますか?キー値のようなものです。値はuserIdに関連付けられたすべてのフィールドのリストですか?具体的には、eventTypeとsourceをキーにしたいと思っています。基本的には、ピボットテーブルのような幅のテーブルの長さを交換したいと思っています。このための私の目標は、将来的にはより迅速な分析のために、最終的にApache ParquetまたはAvroファイル形式として保存されることです。

ここでは例です:

ソースデータ:

userId, eventType, source, errorCode, startDate, endDate 
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452' 
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775' 
552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229' 
552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976' 
284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947' 
552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623' 
284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777' 

目標:

userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch 
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL 
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947' 

フィールド名または順序限り、私はそれらを区別することができますよう、問題ではありません。私はすでにこれを動作させるために二つの方法を試してみた

  1. は手動でテーブルからそれぞれの組み合わせを選択して、マスターデータセットに参加します。これはうまく動作し、よくパラレル化されますが、キーフィールドの値の任意の数を許可せず、スキーマを事前定義する必要があります。
  2. キーの辞書を作成するには、Sparkを使用します。valueレコード各値は辞書です。基本的にデータセットをループし、存在しない場合は辞書に新しいキーを追加し、そのエントリに対して、存在しない場合は新しいフィールドを値辞書に追加します。これは美しく動作しますが、非常に遅く、まったく動作しないとうまく並列化されません。また、それがAvro/Parquet互換フォーマットであるかどうかはわかりません。

これらの2つの方法の代替方法はありますか?または私の目標よりも優れた構造ですか?

答えて

1

このようなことをしたいですか?

from pyspark.sql.functions import struct, col, create_map, collect_list 

df = sc.parallelize([ 
    ['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'], 
    ['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'], 
    ['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'], 
    ['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'], 
    ['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'], 
    ['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'], 
    ['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'] 
]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate')) 
df.show() 

new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\ 
    withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')])) 

new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail')) 
new_df.show() 
+0

おかげでこれを試してみて、自分の意見を与えることができます!これはうまくいくようです!私はライブデータセットで試してみましたが、グループ化の方法に関しては、私が望むものをかなり返しています。しかし、私は "地図のリスト"のデータ構造に精通しておらず、操作のどこにでも記載されたものは何も見つかりません。私は、どのように私はこのデータ構造と対話するのでしょうか?たとえば、特定のユーザーのCHARGE/MERCH属性を取得するにはどうすればよいですか? –

+1

助けてくれてうれしい!これは、フォローアップの質問を開始するのに役立つと思います: 'itertools import chainから; new_df.printSchema();rdd1 = new_df.where(col( 'userId')== '552113')select( 'event_detail')。rdd.flatMap(lambda x:chain(*(x))); keys = rdd1.map(ラムダx:x.keys())。 values = rdd1.map(lambda x:x.values())。collect(); '' keys'と 'values'は調べる必要のあるものです。 – Prem

0

あなたは、

>>> from pyspark.sql import SparkSession 
>>> from pyspark.sql import functions as F 
>>> from pyspark.sql.types import * 

>>> spark = SparkSession.builder.getOrCreate() 

>>> l=[(552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'),(284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'),(552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'),(552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'),(284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'),(552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'),(284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777')] 

>>> df = spark.createDataFrame(l,['userId', 'eventType', 'source', 'errorCode', 'startDate','endDate']) 
>>> df.show(10,False) 
+------+---------+--------+---------+-----------------------+-----------------------+ 
|userId|eventType|source |errorCode|startDate    |endDate    | 
+------+---------+--------+---------+-----------------------+-----------------------+ 
|552113|ACK  |PROVIDER|0  |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452| 
|284723|ACK  |PROVIDER|0  |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775| 
|552113|TRADE |MERCH |0  |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229| 
|552113|CHARGE |MERCH |0  |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976| 
|284723|REFUND |MERCH |1  |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947| 
|552113|CLOSE |PROVIDER|0  |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623| 
|284723|CLOSE |PROVIDER|0  |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777| 
+------+---------+--------+---------+-----------------------+-----------------------+ 

>>> myudf = F.udf(lambda *cols : cols,ArrayType(StringType())) #composition to create rowwise list 
>>> df1 = df.select('userId',myudf('eventType', 'source', 'errorCode','startDate', 'endDate').alias('val_list')) 

>>> df2 = df1.groupby('userId').agg(F.collect_list('val_list')) # grouped on userId 

>>> eventtypes = ['ACK','TRADE','CHARGE','CLOSE','REFUND'] # eventtypes and the order required in output 

>>> def f(Vals): 
     aggVals = [typ for x in eventtypes for typ in Vals if typ[0] == x] # to order the grouped data based on eventtypes above 
     if len(aggVals) == 5: 
      return aggVals 
     else: 
      missngval = [(idx,val) for idx,val in enumerate(eventtypes)if val not in zip(*aggVals)[0]] # get missing eventtypes with their index to create null 
      for idx,val in missngval: 
       aggVals.insert(idx,[None]*5) 
      return aggVals 

>>> myudf2 = F.udf(f,ArrayType(ArrayType(StringType()))) 
>>> df3 = df2.select('userId',myudf2('agg_list').alias('values')) 

>>> df4 = df3.select(['userId']+[df3['values'][i][x] for i in range(5) for x in range(5)]) # to select from Array[Array] 

>>> oldnames = df4.columns 
>>> destnames = ['userId', 'eventTypeAckProvider', 'sourceAckProvider', 'errorCodeAckProvider', 'startDateAckProvider', 'endDateAckProvider', 'eventTypeTradeMerch', 'sourceTradeMerch', 'errorCodeTradeMerch', 'startDateTradeMerch', 'endDateTradeMerch', 'eventTypeChargeMerch', 'sourceChargeMerch', 'errorCodeChargeMerch', 'startDateChargeMerch', 'endDateChargeMerch', 'eventTypeCloseProvider', 'sourceCloseProvider', 'errorCodeCloseProvider', 'startDateCloseProvider', 'endDateCloseProvider', 'eventTypeRefundMerch', 'sourceRefundMerch', 'errorCodeRefundMerch', 'startDateRefundMerch', 'endDateRefundMerch'] 

>>> finalDF = reduce(lambda d,idx : d.withColumnRenamed(oldnames[idx],destnames[idx]),range(len(oldnames)),df4) # Renaming the columns 
>>> finalDF.show()  
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+ 
|userId|eventTypeAckProvider|sourceAckProvider|errorCodeAckProvider|startDateAckProvider |endDateAckProvider  |eventTypeTradeMerch|sourceTradeMerch|errorCodeTradeMerch|startDateTradeMerch |endDateTradeMerch  |eventTypeChargeMerch|sourceChargeMerch|errorCodeChargeMerch|startDateChargeMerch |endDateChargeMerch  |eventTypeCloseProvider|sourceCloseProvider|errorCodeCloseProvider|startDateCloseProvider |endDateCloseProvider |eventTypeRefundMerch|sourceRefundMerch|errorCodeRefundMerch|startDateRefundMerch |endDateRefundMerch  | 
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+ 
|284723|ACK     |PROVIDER   |0     |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|null    |null   |null    |null     |null     |null    |null    |null    |null     |null     |CLOSE     |PROVIDER   |0      |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|REFUND    |MERCH   |1     |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947| 
|552113|ACK     |PROVIDER   |0     |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|TRADE    |MERCH   |0     |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|CHARGE    |MERCH   |0     |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|CLOSE     |PROVIDER   |0      |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|null    |null    |null    |null     |null     | 
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+ 
関連する問題