2017-04-12 1 views
0

これは、この入力ファイルを考えると、スパーク2.1である:Sparkデータフレームのネストされた配列の構造体の値を合計するにはどうすればよいですか?

order.json

{"id":1,"price":202.30,"userid":1} 
{"id":2,"price":343.99,"userid":1} 
{"id":3,"price":399.99,"userid":2} 

そして、次のデータフレーム:

val order = sqlContext.read.json("order.json") 
val df2 = order.select(struct("*") as 'order) 
val df3 = df2.groupBy("order.userId").agg(collect_list($"order").as("array")) 

DF3は以下の内容を持っています:

+------+---------------------------+ 
|userId|array      | 
+------+---------------------------+ 
|1  |[[1,202.3,1], [2,343.99,1]]| 
|2  |[[3,399.99,2]]    | 
+------+---------------------------+ 

と構造:私はuserIdをごとにアレイを有するを利用して、各useridにarray.priceの合計を計算したい

  1. root 
    |-- userId: long (nullable = true) 
    |-- array: array (nullable = true) 
    | |-- element: struct (containsNull = true) 
    | | |-- id: long (nullable = true) 
    | | |-- price: double (nullable = true) 
    | | |-- userid: long (nullable = true) 
    

    は、今私がDF3を与えられていますと仮定行。

  2. 結果のデータフレームの新しい列にこの計算を追加します。私がdf3.withColumn( "sum"、lit(0))を行ったが、lit(0)を私の計算に置き換えた場合のように。

私は両方のことに執着しています。私は配列全体にアクセスするための方法は見つけられませんでした(例えばfoldLeftを使って)。

答えて

1

私は残念ながら、アレイは、ここであなたに対して働く有するアレイ

を持つを利用して、各useridにarray.priceの合計を計算したいと思います。 Spark SQLもDataFrameもありませんDSLでは、このタスクを分解しないで、任意のサイズの配列でこのタスクを直接処理するためのツールを最初に提供しています(explode)。

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{col, udf} 

val totalPrice = udf((xs: Seq[Row]) => xs.map(_.getAs[Double]("price")).sum) 
df3.withColumn("totalPrice", totalPrice($"array")) 
+------+--------------------+----------+ 
|userId|    array|totalPrice| 
+------+--------------------+----------+ 
|  1|[[1,202.3,1], [2,...| 546.29| 
|  2|  [[3,399.99,2]]| 399.99| 
+------+--------------------+----------+ 

または静的Datasetを入力して変換します:

df3 
    .as[(Long, Seq[(Long, Double, Long)])] 
    .map{ case (id, xs) => (id, xs, xs.map(_._2).sum) } 
    .toDF("userId", "array", "totalPrice").show 
+------+--------------------+----------+ 
|userId|    array|totalPrice| 
+------+--------------------+----------+ 
|  1|[[1,202.3,1], [2,...| 546.29| 
|  2|  [[3,399.99,2]]| 399.99| 
+------+--------------------+----------+ 

あなたが分解し、集計上述したように:

あなたはUDFを使用することができます

import org.apache.spark.sql.functions.{sum, first} 

df3 
    .withColumn("price", explode($"array.price")) 
    .groupBy($"userId") 
    .agg(sum($"price"), df3.columns.tail.map(c => first(c).alias(c)): _*) 
+------+----------+--------------------+ 
|userId|sum(price)|    array| 
+------+----------+--------------------+ 
|  1| 546.29|[[1,202.3,1], [2,...| 
|  2| 399.99|  [[3,399.99,2]]| 
+------+----------+--------------------+ 

しかし、それは高価であり、既存の構造を使用していない。

は、あなたが使用することができ醜いトリックがあります:

import org.apache.spark.sql.functions.{coalesce, lit, max, size} 

val totalPrice = (0 to df3.agg(max(size($"array"))).as[Int].first) 
    .map(i => coalesce($"array.price".getItem(i), lit(0.0))) 
    .foldLeft(lit(0.0))(_ + _) 

df3.withColumn("totalPrice", totalPrice) 
+------+--------------------+----------+ 
|userId|    array|totalPrice| 
+------+--------------------+----------+ 
|  1|[[1,202.3,1], [2,...| 546.29| 
|  2|  [[3,399.99,2]]| 399.99| 
+------+--------------------+----------+ 

が、それは本当の解決策よりも好奇心です。

関連する問題