1
Sparkで累積合計したいと思います。Sparkの累積合計
+---------------+-------------------+----+----+----+
| product_id| date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52|
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52|
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52|
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52|
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1|
+---------------+-------------------+----+----+----+
ハイブクエリ:
select *, SUM(val1) over (Partition by product_id, ack order by date_time rows between unbounded preceding and current row) val1_sum, SUM(val2) over (Partition by product_id, ack order by date_time rows between unbounded preceding and current row) val2_sum from test
出力:スパーク・ロジックを使用して
+---------------+-------------------+----+----+----+-------+--------+
| product_id| date_time| ack|val1|val2|val_sum|val2_sum|
+---------------+-------------------+----+----+----+-------+--------+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52| 53| 52|
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52| 106| 104|
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1| 121| 105|
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52| 53| 52|
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52| 106| 104|
+---------------+-------------------+----+----+----+-------+--------+
、私は取得しています上記と同じ出力:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time)
import org.apache.spark.sql.functions._
val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w)
newDf.show
ここレジスタテーブル(入力)であります
ただし、このlogiを試してみるとスパーククラスタval_sum
のcは、累積合計の半分になります。スパーククラスターでなぜそれが起こっているのか分かりません。それはパーティションのためですか?
スパーククラスターの列の累積合計をどのようにすることができますか?
あなたはhttps://stackoverflow.com/questions/47908545/how-to-remember-the-previous-batch-of-spark-streaming-to-calculate-cumulative-suのための答えを記入してくださいすることができます – lucy