これを理解するまでにはかなりの時間がかかりましたが、答えは簡単であることがわかりましたので、ここで解決策を投稿すると思いました。
まずkey
(顧客ID)によってすべてのトランザクションを減らす:
from operators import add
# ddf is a dataframe with a transaction in each row. Key is the column
# we want to group the transactions by.
txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],)).reduceByKey(add)
これは(key, [list of Rows])
のように見えるrdd
を与えます。これをdataframe
に書き込むには、スキーマを構築する必要があります。トランザクションリストはArrayType
によってモデル化することができます。
from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
sqxt.StructField('Key', sqxt.StringType()),
sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])
それから、このような構造でディスクにデータを書き込むために簡単です:
txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')
パフォーマンスはOKらしいです。 RDDを経由せずにこれを行う方法を見つけることができませんでした。