2016-09-01 6 views
1

私はデータセットを持って スパークネストされたforeachの

{ "TS": "1461768452"、 "IP": "10.10.144.209"、 "ID": "KA4aIkFB"、 "DEVICE": "タブレット" 、「HOST」:「krxd.net」}

私はすべてのレコード(ID、デバイス)によって、以下の

グループを実行しようとしていて、各グループのためには、最新のタイムスタンプを取得します。 次に、IDを区別します。

Scalaでこれを行う方法を教えてもらえますか? 私は豚で知っているforeachループをネストすることができます。スパークに相当するものはありますか?

これは何かが、最後の2つのレコードが間違っているように私はこれまで

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
val df = sqlContext1.read.json("sample.json") 
val df2 = df1.select(df1("ID"),df1("DEVICE"),df1("TS")) 
val res= df2.rdd.groupBy (x => (x(0),x(1))).mapValues (x=> x.foreach { x => x(2)}) 
val res1 = res.mapValues(_.maxBy(_.get(2))) 

が見えるなかったものです。

答えて

2

あなたは直接スパークSQL内(GroupedDataでGROUPBYと集計)、そのようなことを行うことができますし、RDDにデータフレームを変換する必要はありません。

テストJSONファイル:test.json

{"TS":"1461768452", "ID":"KA4aIkFA", "DEVICE":"Tablet", "HOST":"krxd.net" } 
{"TS":"1461768462", "ID":"KA4aIkFA", "DEVICE":"Tablet", "HOST":"krxd.net" } 
{"TS":"1461768472", "ID":"KA4aIkFB", "DEVICE":"Tablet", "HOST":"krxd.net" } 
{"TS":"1461768482", "ID":"KA4aIkFB", "DEVICE":"Tablet", "HOST":"krxd.net" } 
{"TS":"1461768492", "ID":"KA4aIkFB", "DEVICE":"Phone", "HOST":"krxd.net" } 

Scalaはコード:

val df = sqlContext.read.json("test.json") 
df.show 
+------+--------+--------+----------+ 
|DEVICE| HOST|  ID|  TS| 
+------+--------+--------+----------+ 
|Tablet|krxd.net|KA4aIkFA|1461768452| 
|Tablet|krxd.net|KA4aIkFA|1461768462| 
|Tablet|krxd.net|KA4aIkFB|1461768472| 
|Tablet|krxd.net|KA4aIkFB|1461768482| 
| Phone|krxd.net|KA4aIkFB|1461768492| 
+------+--------+--------+----------+ 

val newDF = df.select("ID", "DEVICE", "TS") 
       .groupBy("ID", "DEVICE") 
       .agg(max(df("TS")) as "TS") 
newDF.show() 
+--------+------+----------+ 
|  ID|DEVICE| TS  | 
+--------+------+----------+ 
|KA4aIkFB| Phone|1461768492| 
|KA4aIkFA|Tablet|1461768462| 
|KA4aIkFB|Tablet|1461768482| 
+--------+------+----------+ 

newDF.dropDuplicates("ID").show() 
+--------+------+----------+ 
|  ID|DEVICE| TS  | 
+--------+------+----------+ 
|KA4aIkFA|Tablet|1461768462| 
|KA4aIkFB| Phone|1461768492| 
+--------+------+----------+ 
+0

簡単で説得力のある回答です。同じ機能をJavaでも使えますか?なぜなら、JavaでdropDuplicates()関数について聞いたことはありませんでした。 – user4342532

+1

javaのdropDuplicates APIを確認できます。https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/DataFrame.html#dropDuplicates(java.lang.String []) – linbojin

+0

このLinboありがとう。できます! –