2017-12-18 24 views
1

Sparkで累積合計したいと思います。Sparkの累積合計

+---------------+-------------------+----+----+----+ 
|  product_id|   date_time| ack|val1|val2| 
+---------------+-------------------+----+----+----+ 
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52| 
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52| 
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52| 
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52| 
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1| 
+---------------+-------------------+----+----+----+ 

ハイブクエリ:

select *, SUM(val1) over (Partition by product_id, ack order by date_time rows between unbounded preceding and current row) val1_sum, SUM(val2) over (Partition by product_id, ack order by date_time rows between unbounded preceding and current row) val2_sum from test 

出力:スパーク・ロジックを使用して

+---------------+-------------------+----+----+----+-------+--------+ 
|  product_id|   date_time| ack|val1|val2|val_sum|val2_sum| 
+---------------+-------------------+----+----+----+-------+--------+ 
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52|  53|  52| 
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52| 106|  104| 
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1| 121|  105| 
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52|  53|  52| 
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52| 106|  104| 
+---------------+-------------------+----+----+----+-------+--------+ 

、私は取得しています上記と同じ出力:

import org.apache.spark.sql.expressions.Window 
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time) 
import org.apache.spark.sql.functions._ 

val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w) 
newDf.show 
ここレジスタテーブル(入力)であります

ただし、このlogiを試してみるとスパーククラスタval_sumのcは、累積合計の半分になります。スパーククラスターでなぜそれが起こっているのか分かりません。それはパーティションのためですか?

スパーククラスターの列の累積合計をどのようにすることができますか?

答えて

1

DataFrame APIを使用して累積合計を取得するには、rowsBetweenウィンドウメソッドを設定する必要があります。 Spark 2.1以降では:

val w = Window.partitionBy($"product_id", $"ack") 
    .orderBy($"date_time") 
    .rowsBetween(Window.unboundedPreceding, Window.currentRow) 

これは、パーティションの先頭から現在の行までの値を使用するようにSparkに指示します。古いバージョンのSparkを使用する場合は、同じ効果としてrowsBetween(Long.MinValue, 0)を使用してください。

ウィンドウを使用するには、以前と同じ方法を使用します。私。

val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w)) 
    .withColumn("val2_sum", sum($"val2").over(w)) 
+0

あなたはhttps://stackoverflow.com/questions/47908545/how-to-remember-the-previous-batch-of-spark-streaming-to-calculate-cumulative-suのための答えを記入してくださいすることができます – lucy