2017-03-14 19 views
0

thisイメージのようなファイルを以下のデータセットで作成したいとします。 結果は、この行でデータフレームをフィルタリングした結果である:行のpysparkの行をRDDに変換する

df1 = df0.rdd.filter(lambda x: 'VS' in x.MeterCategory) 
    .map(lambda x: [x.vId,x.Meters]).take(2) 

のDataSet:

[ABCD1234, Row(0=6.0, 10=None, 100=None, 1000=None, 10000=None, 1000000=None, 100000000=None, 10235=None, 1024=None)] 
[WXYZ9999,Row(0=40.0, 10=None, 100=None, 1000=None, 10000=None, 1000000=None, 100000000=None, 10235=None, 1024=None)] 

https://i.stack.imgur.com/8nUkH.png

私は私が見つけたことを、いくつかのアプローチをしようとしてきましたこのフォーラムは、私は結果を達成することはできません。 あなたの例のデータを使用したおかげで

+0

ありがとう、両方のソリューションが動作します。 しかし、今、dev。チームは。(ドット)でフィールド名を追加し、pysparkはフィールド名を見つけることができません。 for..in ...を使用すると、この命令がノードによって処理されても、ワーカーノードのパフォーマンスが低下するかどうかは分かりません。 クラスタは、デフォルト設定のMicrosoft hdinsightです。 ありがとう –

答えて

0

df = sc.parallelize([('ABCD1234',6.0,'None','None','None','None','None','None','None','None'), 
        ('WXYZ9999',40.0,'None','None','None','None','None','None','None','None')]).toDF(['Id','0','10','100','1000','10000','1000000','100000000','10235','1024']) 

あなたは次のコードを使用して、データをピボットすることができます

from pyspark.sql import functions as F 
from pyspark.sql.types import StringType 

kvp = F.explode(F.array([F.struct(F.lit(c).cast(StringType()).alias("Key"), F.col(c).cast(StringType()).alias("Value")) for c in df.columns if c!='Id'])).alias("kvp") 
df_pivoted = df.select(['Id'] + [kvp]).select(['Id'] + ["kvp.Key", "kvp.Value"]) 
df_pivoted.show() 

そして、あなたがにデータフレームを変換することによって、単一のCSVへの出力データをすることができますパンダ:

df_pivoted.toPandas().to_csv('e:/output.csv',index=False,header = 'true', sep='|') 

出力としては、

Id|Key|Value 
ABCD1234|0|6.0 
ABCD1234|10|None 
ABCD1234|100|None 
ABCD1234|1000|None 
ABCD1234|10000|None 
ABCD1234|1000000|None 
ABCD1234|100000000|None 
ABCD1234|10235|None 
ABCD1234|1024|None 
WXYZ9999|0|40.0 
WXYZ9999|10|None 
WXYZ9999|100|None 
WXYZ9999|1000|None 
WXYZ9999|10000|None 
WXYZ9999|1000000|None 
WXYZ9999|100000000|None 
WXYZ9999|10235|None 
WXYZ9999|1024|None 
0

これを見てください。

は、まずあなたがDF1に言及されていることRDDあなたは物事を試して、次のようにあなたが言及したデータセットとそのRDDを作成することができるデータフレーム

ではありませんので、予めご了承ください。

カラム名の接頭辞として '_'を使用していますが、純粋な数値はカラム名として直接使用できないことに注意してください。

>>> from pyspark.sql import Row 

>>> row1 = Row(_0=6.0, _10=None, _100=None, _1000=None, _10000=None, _1000000=None, 
      _100000000=None, _10235=None, _1024=None) 
>>> row2 = Row(_0=40.0, _10=None, _100=None, _1000=None, _10000=None, _1000000=None, 
      _100000000=None, _10235=None, _1024=None) 

>>> yourStartDataset = sc.parallelize([ 
             ['ABCD1234',row1], 
             ['WXYZ9999',row2] 
             ]) 

今、あなたのデータセットは、この

>>> yourStartDataset.take(2) 

[['ABCD1234', 
    Row(_0=6.0, _10=None, _100=None, _1000=None, _10000=None, _1000000=None, _100000000=None, _10235=None, _1024=None)], 
['WXYZ9999', 
    Row(_0=40.0, _10=None, _100=None, _1000=None, _10000=None, _1000000=None, _100000000=None, _10235=None, _1024=None)]] 

あなたは列の唯一の数字の部分を取得したい場合は今すぐ下の行は、魔法

>>> yourStartDataset.flatMapValues(lambda v: v.asDict().items()).map(lambda (a, (b, c)): (a, b, c)).collect() 

[('ABCD1234', '_1000000', None), 
('ABCD1234', '_100000000', None), 
('ABCD1234', '_100', None), 
('ABCD1234', '_10000', None), 
('ABCD1234', '_0', 6.0), 
('ABCD1234', '_1000', None), 
('ABCD1234', '_10', None), 
('ABCD1234', '_10235', None), 
('ABCD1234', '_1024', None), 
('WXYZ9999', '_1000000', None), 
('WXYZ9999', '_100000000', None), 
('WXYZ9999', '_100', None), 
('WXYZ9999', '_10000', None), 
('WXYZ9999', '_0', 40.0), 
('WXYZ9999', '_1000', None), 
('WXYZ9999', '_10', None), 
('WXYZ9999', '_10235', None), 
('WXYZ9999', '_1024', None)] 

をするか、またはだろう、のように見えます以下が実行されます

>>> yourStartDataset.flatMapValues(lambda v: v.asDict().items()).map(lambda (a, (b, c)): (a, b[1:], c)).collect() 

[('ABCD1234', '1000000', None), 
('ABCD1234', '100000000', None), 
('ABCD1234', '100', None), 
('ABCD1234', '10000', None), 
('ABCD1234', '0', 6.0), 
('ABCD1234', '1000', None), 
('ABCD1234', '10', None), 
('ABCD1234', '10235', None), 
('ABCD1234', '1024', None), 
('WXYZ9999', '1000000', None), 
('WXYZ9999', '100000000', None), 
('WXYZ9999', '100', None), 
('WXYZ9999', '10000', None), 
('WXYZ9999', '0', 40.0), 
('WXYZ9999', '1000', None), 
('WXYZ9999', '10', None), 
('WXYZ9999', '10235', None), 
('WXYZ9999', '1024', None)] 

これは役に立ちます。

関連する問題