2017-03-20 28 views
2

私はプログラミングを初めて覚えました。私はこのような入力データを持っていて、各グループの累積サマリーを取得したい、火花のpythonプログラムの助けが必要です。誰かが私にこのことを案内してくれることを感謝します。Python Spark RDD APIを使用してグループ単位で累積合計を検索する方法

入力データ:

11,1,1,100

11,1,2,150

12,1,1,50

12,2,1,70

12,2,2,20

出力データは次のようになります。

11,1,1,100

11,1,2,250 //(100 + 150)

12,1,1,50

12,2,1,70

12,2,2,90 //(70 + 20)iが試み

コード:

def parseline(line): 
    fields = line.split(",") 
    f1 = float(fields[0]) 
    f2 = float(fields[1]) 
    f3 = float(fields[2]) 
    f4 = float(fields[3]) 
    return (f1, f2, f3, f4) 

input = sc.textFile("FIle:///...../a.dat") 
line = input.map(parseline) 
linesorted = line.sortBy(lambda x: (x[0], x[1], x[2])) 
runningpremium = linesorted.map(lambda y: (((y[0], y[1]),  y[3])).reduceByKey(lambda accum, num: accum + num) 

for i in runningpremium.collect(): 
     print i 
+2

は、最も単純な答えは次のようになります。ウィンドウ関数でデータフレームを使用しています。コードははるかに簡単になります –

+0

最も簡単な*もちろん;) –

+0

可能であれば、データフレームなしでやってみてください、あなたのコメントありがとう。 – RoyR

答えて

0

使用してデータフレームのAPI

from pyspark.sql.types import StructType, StringType, LongType,StructField 
from pyspark import SparkConf,SparkContext 
from pyspark.sql import SparkSession 
sc= spark.sparkContext 

rdd = sc.parallelize([(11, 100),(11, 150),(12, 50),(12, 70),(12, 20)]) 

schema = StructType([ 
    StructField("id", StringType()), 
    StructField("amount", LongType()) 
    ]) 

df = spark.createDataFrame(rdd, schema) 

df.registerTempTable("amount_table") 
df.show(); 
df2 = spark.sql("SELECT id,amount, sum(amount) OVER (PARTITION BY id ORDER BY amount) as cumulative_sum FROM amount_table") 
df2.show() 

RDDのAPIを使用したが、これを試してみてください。

rdd = sc.parallelize([(11, 1, 2, 100), (11, 2, 1, 150), (12, 1, 2, 50), (12, 1, 3, 70), (12, 3, 4, 20)]) 

def get_key_value(rec): 
    # for grouping as key value 
    return rec[0], rec[1:] 

from itertools import accumulate 

def cumsum(values): 
    return [k[0]+[k[1]] for k in zip([[i[0],i[1]] for i in values], accumulate([i[2] for i in values]))] 

print(rdd.map(get_key_value).collect()) # output after get_key_value 
print(rdd.map(get_key_value).groupByKey().mapValues(cumsum).flatMapValues(lambda x:x).map(lambda x: [x[0]]+x[1]).collect()) 

出力:

[(11, (1, 2, 100)), (11, (2, 1, 150)), (12, (1, 2, 50)), (12, (1, 3, 70)), (12, (3, 4, 20))] 
[[11, 1, 2, 100], [11, 2, 1, 250], [12, 1, 2, 50], [12, 1, 3, 120], [12, 3, 4, 140]] 

2つの列のみ(2つの値のを含む単純な例各レコード)

rdd=sc.parallelize([(11, 100), (11, 150), (12, 50), (12, 70), (12, 20)]) 
from itertools import accumulate 

def cumsum(values): 
    return list(accumulate(values)) 
print(rdd.groupByKey().mapValues(cumsum).collect()) 
print(rdd.groupByKey().mapValues(cumsum).flatMapValues(lambda x:x).collect()) 

出力:

[(11, [100, 250]), (12, [50, 120, 140])] 
[(11, 100), (11, 250), (12, 50), (12, 120), (12, 140)] 
+0

私はデータフレームがまだ利用できないsparkの下位バージョンを使用しています。したがって、あなたの答えを受け入れることはできません。しかし、あなたはRDDスタイルのソリューションを共有していただければ幸いです。 – RoyR

+0

@RoyR RDD APIの使用法も追加しました – Himaprasoon

1

コメントのように、あなたはスパークデータフレームに、ここで累積和を行うには、ウィンドウ関数を使用することができます。まず、我々はあなたが列cにより、カラムabその後、順序によって分割することができdummie列'a', 'b', 'c', 'd'

ls = [(11,1,1,100), (11,1,2,150), (12,1,1,50), (12,2,1,70), (12,2,2,20)] 
ls_rdd = spark.sparkContext.parallelize(ls) 
df = spark.createDataFrame(ls_rdd, schema=['a', 'b', 'c', 'd']) 

と例のデータフレームを作成することができます。続いて、最後に列dsum関数を適用

from pyspark.sql.window import Window 
import pyspark.sql.functions as func 

w = Window.partitionBy([df['a'], df['b']]).orderBy(df['c'].asc()) 
df_cumsum = df.select('a', 'b', 'c', func.sum(df.d).over(w).alias('cum_sum')) 
df_cumsum.sort(['a', 'b', 'c']).show() # simple sort column 

出力

+---+---+---+-------+ 
| a| b| c|cum_sum| 
+---+---+---+-------+ 
| 11| 1| 1| 100| 
| 11| 1| 2| 250| 
| 12| 1| 1|  50| 
| 12| 2| 1|  70| 
| 12| 2| 2|  90| 
+---+---+---+-------+ 
関連する問題