2016-12-29 6 views
0

以下は私のデータセットです。spark dataset API:他の集計とともに各ユーザーのデバイス使用状況の分布を確認

user,device,time_spent,video_start 
userA,mob,5,1 
userA,desk,5,2 
userA,desk,5,3 
userA,mob,5,2 
userA,mob,5,2 
userB,desk,5,2 
userB,mob,5,2 
userB,mob,5,2 
userB,desk,5,2 

私は、ユーザごとに集計を下回る知りたいです。

user  total_time_spent  device_distribution 
    userA   20    {mob:60%,desk:40%} 
    userB   20    {mob:50%,desk:50%} 

スパーク2.0 APIをJavaで使用してこれを達成できる人がいますか?私はUserDefinedAggregateFunctionを使ってみましたが、グループ内でグループをサポートしていないため、デバイスごとに各ユーザーグループをグループ化して、各デバイスで費やされた時間を集計する必要があります。

答えて

1

ここでは、pivot関数は非常に便利です。被験者のDatabricksのarticle

import org.apache.spark.sql.functions.udf 

case class DeviceDistribution(mob: String, desk: String) 

val makeDistribution = udf((mob: Long, desk: Long) => { 
    val mobPct = 100.0 * mob/(mob + desk) 
    val deskPct = 100.0 * desk/(mob + desk) 

    DeviceDistribution(s"$mobPct%", s"$deskPct%") 
}) 

// load your dataset 

data 
    .groupBy("user", "device") 
    .agg(sum("time_spent").as("total_time_spent_by_device")) 
    .groupBy("user") 
    .pivot("device", Seq("mob", "desk")) 
    .agg(first(col("total_time_spent_by_device"))) 
    .withColumn("total_time_spent", col("mob") + col("desk")) 
    .withColumn("device_distribution", makeDistribution(col("mob"), col("desk"))) 
    .select("user", "total_time_spent", "device_distribution") 
    .show 

// Result 
+-----+----------------+-------------------+ 
| user|total_time_spent|device_distribution| 
+-----+----------------+-------------------+ 
|userA|    25|  [60.0%,40.0%]| 
|userB|    20|  [50.0%,50.0%]| 
+-----+----------------+-------------------+ 

NB:あなたは集計関数を必要とするpivot機能付きコード(申し訳ありません、それはScalaのだが、それは、Javaにそれを翻訳するためには大きな問題になることはありません)のために。ここではデバイスごとに1つの値しかないので、単にfirstを使うことができます。

device_distributionカラム形式は、あなたが探しているまさにではなく:

  • あなたは自分の価値観であなたが望むすべてを行うことができますピボット行の後に(それはあなたが欲しい書式設定が含まれます)
  • 例えばとすると、出力データをjson形式で保存すると、これはまさに必要な形式になります。
1

フロランMoiny、

私の質問に答えるために感謝します。

しかし、このソリューションを本番環境にプッシュしたい場合は、このソリューションに問題があることがわかりました。

たとえば、TBデータソースでは、いくつのデバイスタイプが可能であるかを事前に知る必要があります。この状況では、イベントピボットもほとんど理解できません。

私はこの問題をJavaで完全に解決しました。あなたはここでそれを見ることができます。

私はこの目的のためにUserDefinedAggregateFunctionを使用しました。これは特に集計のためのUDFです。

基本的には、最初にユーザーとデバイスをグループ化してからこのカスタムUDFを呼び出して、同時にデバイス配布を検索し、ユーザーレベルで他の集計を行います。

https://github.com/himanshu-parmar-bigdata/spark-java-udf-demo

おかげで、 ヒマンシュ

関連する問題