2017-04-05 6 views
0

私はscalaとsparkで新しく "path"フィールドを爆発させる方法を知らず、1回のパスで最大と最小の "event_dttm"フィールドを見つけました。 は、私がデータを持っている:配列[文字列]フィールドとグループデータを1回のパスで分解する方法

val weblog=sc.parallelize(Seq(
    ("39f0412b4c91","staticnavi.com", Seq("panel", "cm.html"), 1424954530, "SO.01"), 
    ("39f0412b4c91","staticnavi.com", Seq("panel", "cm.html"), 1424964830, "SO.01"), 
    ("39f0412b4c91","staticnavi.com", Seq("panel", "cm.html"), 1424978445, "SO.01"), 
    )).toDF("id","domain","path","event_dttm","load_src") 

私は次の結果を得るために行う必要があります。

"id"  | "domain" |"newPath" | "max_time" | min_time | "load_src" 
39f0412b4c91|staticnavi.com| panel | 1424978445 | 1424954530 | SO.01 
39f0412b4c91|staticnavi.com| cm.html | 1424978445 | 1424954530 | SO.01 

を私はそれが行関数を介して可能リアライズだと思いますが、方法がわかりません。

答えて

1

あなたはgroupBy集約続く、explode()を探しています:

import org.apache.spark.sql.functions.{explode, min, max} 

var result = weblog.withColumn("path", explode($"path")) 
    .groupBy("id","domain","path","load_src") 
    .agg(min($"event_dttm").as("min_time"), 
     max($"event_dttm").as("max_time")) 

result.show() 
+------------+--------------+-------+--------+----------+----------+ 
|   id|  domain| path|load_src| min_time| max_time| 
+------------+--------------+-------+--------+----------+----------+ 
|39f0412b4c91|staticnavi.com| panel| SO.01|1424954530|1424978445| 
|39f0412b4c91|staticnavi.com|cm.html| SO.01|1424954530|1424978445| 
+------------+--------------+-------+--------+----------+----------+ 
+0

ありがとう!正常に動作します。 explodeを使用しない別の方法はありますか? – Fred

+0

'rdd' APIを使用していますが、それはもっと精巧で潜在的に遅くなるでしょう。 – mtoto

+0

flatMapで解決策を見つけました:val result = weblog.flatMap { case Row(id:String、domain:String、path:String、event_dttm:Long、load_src:String、ymd:String)=> { パスです。 (x)、BigInt(event_dttm)、load_src、ymd)) }}} – Fred

関連する問題