2017-01-11 7 views
0

RDDには、geog、製品、時間、価格のような4列のデータがあります。ジオグと時間に基づいてランニング合計を計算したいと思います。データ入手方法Spark scala RDDを使用して2つの列に基づいて合計を実行する

Source

考える

は、私は次のようになる必要があります。

[Need Output like this]

私はこの火花Scalaの-RDDを必要としています。私はこのScalaの世界に慣れていないので、SQLでこれを簡単に実現できます。私はspark-Scala -RDD(map、flatmap)のようにこれをしたい。

アドバイスありがとうございます。

+0

いずれのコードも私たちにとって動機付けであったでしょう。 –

+0

アキュムレータを使用して合計を追跡する必要があります。 – code

+0

@balaji RDDをDataFrameに変換して既存のSQLを再利用することができます:) –

答えて

2

これは、ウィンドウ関数を定義することで可能です:

>>> val data = List(
    ("India","A1","Q1",40), 
    ("India","A2","Q1",30), 
    ("India","A3","Q1",21), 
    ("German","A1","Q1",50), 
    ("German","A3","Q1",60), 
    ("US","A1","Q1",60), 
    ("US","A2","Q2",25), 
    ("US","A4","Q1",20), 
    ("US","A5","Q5",15), 
    ("US","A3","Q3",10) 
) 

>>> val df = sc.parallelize(data).toDF("country", "part", "quarter", "result") 
>>> df.show() 

+-------+----+-------+------+ 
|country|part|quarter|result| 
+-------+----+-------+------+ 
| India| A1|  Q1| 40| 
| India| A2|  Q1| 30| 
| India| A3|  Q1| 21| 
| German| A1|  Q1| 50| 
| German| A3|  Q1| 60| 
|  US| A1|  Q1| 60| 
|  US| A2|  Q2| 25| 
|  US| A4|  Q1| 20| 
|  US| A5|  Q5| 15| 
|  US| A3|  Q3| 10| 
+-------+----+-------+------+ 

>>> val window = Window.partitionBy("country").orderBy("part", "quarter") 
>>> val resultDF = df.withColumn("agg", sum(df("result")).over(window)) 
>>> resultDF.show() 

+-------+----+-------+------+---+ 
|country|part|quarter|result|agg| 
+-------+----+-------+------+---+ 
| India| A1|  Q1| 40| 40| 
| India| A2|  Q1| 30| 70| 
| India| A3|  Q1| 21| 91| 
|  US| A1|  Q1| 60| 60| 
|  US| A2|  Q2| 25| 85| 
|  US| A3|  Q3| 10| 95| 
|  US| A4|  Q1| 20|115| 
|  US| A5|  Q5| 15|130| 
| German| A1|  Q1| 50| 50| 
| German| A3|  Q1| 60|110| 
+-------+----+-------+------+---+ 

あなたは、この使用してウィンドウの機能を行うことができますが、Windowsの程度Databrickのブログを見てみてください。 https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

は、この情報がお役に立てば幸いです。

ハッピースパーク!乾杯、Fokko

+0

助けてくれてありがとうフォッコ。私はRDDでも試しました。 – balaji

1

これは他の人にも役立つと思います。私はSCALA RDDで試しました。

val fileName_test_1 ="C:\\venkat_workshop\\Qintel\\Data_Files\\test_1.txt" 


    val rdd1 = sc.textFile(fileName_test_1).map { x => (x.split(",")(0).toString() , 
                  x.split(",")(1).toString(), 
                  x.split(",")(2).toString(), 
                  x.split(",")(3).toDouble 
                 ) 
                }.groupBy(x => (x._1,x._3)) 
                .mapValues 
                  { 
                   _.toList.sortWith 
                   { 
                   (a,b) => (a._4) > (b._4) 
                   }.scanLeft("","","",0.0,0.0){ 
                   (a,b) => (b._1,b._2,b._3,b._4,b._4+a._5) 
                   }.tail 
                  }.flatMapValues(f => f).values 
関連する問題