使用してデータフレームのAPI:
from pyspark.sql.types import StructType, StringType, LongType,StructField
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
sc= spark.sparkContext
rdd = sc.parallelize([(11, 100),(11, 150),(12, 50),(12, 70),(12, 20)])
schema = StructType([
StructField("id", StringType()),
StructField("amount", LongType())
])
df = spark.createDataFrame(rdd, schema)
df.registerTempTable("amount_table")
df.show();
df2 = spark.sql("SELECT id,amount, sum(amount) OVER (PARTITION BY id ORDER BY amount) as cumulative_sum FROM amount_table")
df2.show()
RDDのAPIを使用したが、これを試してみてください。
rdd = sc.parallelize([(11, 1, 2, 100), (11, 2, 1, 150), (12, 1, 2, 50), (12, 1, 3, 70), (12, 3, 4, 20)])
def get_key_value(rec):
# for grouping as key value
return rec[0], rec[1:]
from itertools import accumulate
def cumsum(values):
return [k[0]+[k[1]] for k in zip([[i[0],i[1]] for i in values], accumulate([i[2] for i in values]))]
print(rdd.map(get_key_value).collect()) # output after get_key_value
print(rdd.map(get_key_value).groupByKey().mapValues(cumsum).flatMapValues(lambda x:x).map(lambda x: [x[0]]+x[1]).collect())
出力:
[(11, (1, 2, 100)), (11, (2, 1, 150)), (12, (1, 2, 50)), (12, (1, 3, 70)), (12, (3, 4, 20))]
[[11, 1, 2, 100], [11, 2, 1, 250], [12, 1, 2, 50], [12, 1, 3, 120], [12, 3, 4, 140]]
2つの列のみ(2つの値のを含む単純な例各レコード)
rdd=sc.parallelize([(11, 100), (11, 150), (12, 50), (12, 70), (12, 20)])
from itertools import accumulate
def cumsum(values):
return list(accumulate(values))
print(rdd.groupByKey().mapValues(cumsum).collect())
print(rdd.groupByKey().mapValues(cumsum).flatMapValues(lambda x:x).collect())
出力:
[(11, [100, 250]), (12, [50, 120, 140])]
[(11, 100), (11, 250), (12, 50), (12, 120), (12, 140)]
は、最も単純な答えは次のようになります。ウィンドウ関数でデータフレームを使用しています。コードははるかに簡単になります –
最も簡単な*もちろん;) –
可能であれば、データフレームなしでやってみてください、あなたのコメントありがとう。 – RoyR