2016-05-14 11 views
0

pyspark.sql.dataframe.DataFrameに格納されたトランザクションを "key"列(この場合は顧客ID)を示す列 "key"でグループ化したいとします。Python Sparkグループのネストされたスキーマへのトランザクション

グループ化はので、私は、ネストされたスキーマ内のディスクにグループを書きたい、非常に高価なプロセスである:

(key, [[c1, c2, c3,...], ...]) 

これは私がすぐにキーにすべてのトランザクションをロードすることができ、かつ複合体を開発しますグループを再実行せずにカスタムアグリゲータを使用できます。

ネストされたスキーマを作成してディスクに書き込む方法を教えてください。

答えて

0

これを理解するまでにはかなりの時間がかかりましたが、答えは簡単であることがわかりましたので、ここで解決策を投稿すると思いました。

まずkey(顧客ID)によってすべてのトランザクションを減らす:

from operators import add 
# ddf is a dataframe with a transaction in each row. Key is the column 
# we want to group the transactions by. 

txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],)).reduceByKey(add) 

これは(key, [list of Rows])のように見えるrddを与えます。これをdataframeに書き込むには、スキーマを構築する必要があります。トランザクションリストはArrayTypeによってモデル化することができます。

from pyspark.sql import types as sqxt 
txn_schema = sqxt.StructType([ 
    sqxt.StructField('Key', sqxt.StringType()), 
    sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema)) 
]) 

それから、このような構造でディスクにデータを書き込むために簡単です:

txnddf = txnrdd.toDF(schema=txn_schema) 
txnddf.write.parquet('customer-transactions.parquet') 

パフォーマンスはOKらしいです。 RDDを経由せずにこれを行う方法を見つけることができませんでした。

関連する問題