2017-11-29 1 views
0

こんにちは私はいくつかの将来の処理のためにきれいにしようとしているやや難しいファイル形式を扱っています。私はPysparkを使ってデータをデータフレームに処理しています。PySparkは名前に基づいて複数の列にリストを分解します

ファイルはこのようになります。その中に符号化さ

AA 1234 ZXYW 
BB A 890 
CC B 321 
AA 1234 LMNO 
BB D 123 
CC E 321 
AA 1234 ZXYW 
CC E 456 

それぞれ「AA」レコードは、論理グループまたはレコードの開始を定義し、各ライン上のデータは固定長であり、持っている情報をそのI抽出したい。少なくとも20-30種類のレコードタイプがあります。それらは常に各行の先頭に2文字のコードで識別されます。第一段階として(すなわちないすべてのレコードタイプは、グループごとに存在している)、各グループに

1または多数の異なるレコードタイプが存在することができ、私はこの形式でまとめてグループにレコードを管理している:

+----------------+---------------------------------+ 
|   index|       result| 
+----------------+---------------------------------+ 
|    1|[AA 1234 ZXYV,BB A 890,CC B 321]| 
|    2|[AA 1234 LMNO,BB D 123,CC E 321]| 
|    3|[AA 1234 ZXYV,CC B 321]   | 
+----------------+---------------------------------+ 

そして、私は本当にデータフレーム内の次の列にデータを取得したい第二段階として:

+----------------+---------------------------------+-------------+--------+--------+ 
|   index|       result|   AA|  BB|  CC| 
+----------------+---------------------------------+-------------+--------+--------+ 
|    1|[AA 1234 ZXYV,BB A 890,CC B 321]|AA 1234 ZXYV|BB A 890|CC B 321| 
|    2|[AA 1234 LMNO,BB D 123,CC E 321]|AA 1234 LMNO|BB D 123|CC E 321| 
|    3|[AA 1234 ZXYV,CC B 321]   |AA 1234 ZXYV| Null|CC B 321| 
+----------------+---------------------------------+-------------+--------+--------+ 

その時点で私は些細なことする必要があり、必要な情報を抽出するので。

どのようにすればよいでしょうか?

多くのありがとうございます。

答えて

1

これを実現するにはflatMappivotを使用できます。第一段階からの結果から出発

rdd = sc.parallelize([(1,['AA 1234 ZXYV','BB A 890','CC B 321']), 
         (2,['AA 1234 LMNO','BB D 123','CC E 321']), 
         (3,['AA 1234 ZXYV','CC B 321'])]) 

df = rdd.toDF(['index', 'result']) 

あなたが最初flatMapを使用して複数の行に配列を爆発し、別の列に二文字の識別子を抽出することができます。

df_flattened = df.rdd.flatMap(lambda x: [(x[0],y, y[0:2],y[3::]) for y in x[1]])\ 
       .toDF(['index','result', 'identifier','identifiertype']) 

と列名に2文字の識別子を変更するpivotを使用します。

df_result = df_flattened.groupby(df_flattened.index,)\ 
         .pivot("identifier")\ 
         .agg(first("identifiertype"))\ 
         .join(df,'index') 

私はRDDに変換せずに配列を爆発するバックresult列を取得するために参加

+0

で旋回させるためのいくつかの良い例を見つけることができます。あなたの助けをありがとう。 – robarthur1

2

代替方法を追加しました、

from pyspark.sql import functions as F 

udf1 = F.udf(lambda x : x.split()[0]) 
df.select('index',F.explode('result').alias('id'),udf1(F.col('id')).alias('idtype')).show() 

+-----+-------------+------+ 
|index|   id|idtype| 
+-----+-------------+------+ 
| 1|AA 1234 ZXYV| AA| 
| 1|  BB A 890| BB| 
| 1|  CC B 321| CC| 
| 2|AA 1234 LMNO| AA| 
| 2|  BB D 123| BB| 
| 2|  CC E 321| CC| 
| 3|AA 1234 ZXYV| AA| 
| 3|  CC B 321| CC| 
+-----+-------------+------+ 

df1.groupby('index').pivot('idtype').agg(F.first('id')).join(df,'index').show() 
0

あなたはusi Spark 2.xでは、あなたが探しているのは、スパークデータフレームのピボット操作です。

最初に、2つの文字列、2文字のエンコーディング、および残りのコンテンツを別の列に作成することができます。次に、データフレームのピボットを使用して、これを行うことができます。

df.pivot("encoding_col",Seq("AA","BB")) 

あなたは私が必要とまさに、絶対に完璧に働いたデータフレームhere

関連する問題