2016-09-01 13 views
1

異なる時間ステップで累積カウントを計算したい。私は各期間に発生したイベントの数を持っていますt:今度はその期間までの累積イベント数を欲しがります。ハードコーディングのないt期間の累積カウントを計算する

私はそれぞれの累積を別々に簡単に計算できますが、面倒です。私はUnionAllと一緒に戻すことができますが、これは時間がかかりすぎて面倒です。

これをもっときれいに行うことができますか?

package main.scala 

import java.io.File 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.functions._ 

object Test { 

    def main(args: Array[String]) { 

     // Spark and SQL Context (gives access to Spark and Spark SQL libraries) 
     val conf = new SparkConf().setAppName("Merger") 
     val sc = new SparkContext(conf) 
     val sqlContext = SQLContextSingleton.getInstance(sc) 
     import sqlContext.implicits._ 

     // Count 
     val count = Seq(("A",1,1),("A",1,2),("A",0,3),("A",0,4),("A",0,5),("A",1,6), 
         ("B",1,1),("B",0,2),("B",0,3),("B",1,4),("B",0,5),("B",1,6)) 
      .toDF("id","count","t") 

     val count2 = count.filter('t <= 2).groupBy('id).agg(sum("count"), max("t")) 

     val count3 = count.filter('t <= 3).groupBy('id).agg(sum("count"), max("t")) 

     count.show() 
     count2.show() 
     count3.show() 
    } 
} 

count

+---+-----+---+ 
| id|count| t| 
+---+-----+---+ 
| A| 1| 1| 
| A| 1| 2| 
| A| 0| 3| 
| A| 0| 4| 
| A| 0| 5| 
| A| 1| 6| 
| B| 1| 1| 
| B| 0| 2| 
| B| 0| 3| 
| B| 1| 4| 
| B| 0| 5| 
| B| 1| 6| 
+---+-----+---+ 

count2

+---+----------+------+ 
| id|sum(count)|max(t)| 
+---+----------+------+ 
| A|   2|  2| 
| B|   1|  2| 
+---+----------+------+ 

count3

+---+----------+------+ 
| id|sum(count)|max(t)| 
+---+----------+------+ 
| A|   2|  3| 
| B|   1|  3| 
+---+----------+------+ 

答えて

0

私はスパーク1.5.2 /スカラ座10とスパークでそれをテストしています2.0.0/Scala 11そしてそれは魅力のように働いた。 Spark 1.6.2では動作しませんでした。なぜなら、Hiveでコンパイルされていないからです。

package main.scala 

import java.io.File 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.SQLContext 


object Test { 

    def main(args: Array[String]) { 

     val conf = new SparkConf().setAppName("Test") 
     val sc = new SparkContext(conf) 
     val sqlContext = SQLContextSingleton.getInstance(sc) 
     import sqlContext.implicits._ 

     val data = Seq(("A",1,1,1),("A",3,1,3),("A",0,0,2),("A",4,0,4),("A",0,0,6),("A",2,1,5), 
         ("B",0,1,3),("B",0,0,4),("B",2,0,1),("B",2,1,2),("B",0,0,6),("B",1,1,5)) 
      .toDF("id","param1","param2","t") 
     data.show() 

     data.withColumn("cumulativeSum1", sum("param1").over(Window.partitionBy("id").orderBy("t"))) 
      .withColumn("cumulativeSum2", sum("param2").over(Window.partitionBy("id").orderBy("t"))) 
      .show() 
    } 
} 

私が働いているの改善ではなくwithColumnを繰り返すことで、一度に複数の列に適用することができることです。入力を歓迎します!

0

私は方法であなたのデータを正規化するデ示唆あなたは1つのステップで累積を行うことができます。 このコードもスケールする必要があります(ドライバにコレクションが1つしかないため)。

私の例ではデータフレームAPIを使用していないため申し訳ありませんが(私はデータフレームをテストすることはできませんので、私の火花のインストールが多少borkedさ):

val count = sc.makeRDD(Seq(("A",1,1),("A",1,2),("A",0,3),("A",0,4),("A",0,5),("A",1,6), 
    ("B",1,1),("B",0,2),("B",0,3),("B",1,4),("B",0,5),("B",1,6))) 

// this is required only if number of timesteps is not known, this is the only operation that collects data to driver, and could even be broadcasted if large 
val distinctTimesteps = count.map(_._3).distinct().sortBy(e => e, true).collect() 

// this actually de-normalizes data so that it can be cumulated 
val deNormalizedData = count.flatMap { case (id, c, t) => 
    // the trick is making composite key consisting of distinct timestep and your id: (distTimestep, id) 
    distinctTimesteps.filter(distTimestep => distTimestep >= t).map(distTimestep => (distTimestep, id) -> c) 
} 

// just reduce by composite key and you are done 
val cumulativeCounts = deNormalizedData.reduceByKey(_ + _) 

// test 
cumulativeCounts.collect().foreach(print) 
関連する問題