0
私はその結果を取得したいと思いDF = [CUSTOMER_ID ,itemType, eventTimeStamp, valueType, value ]
集合関数内に条件を持たせる方法:Scala/Spark?
+-------------+-----------+-------------+--------+--------------------+
| CUSTOMER_ID | itemType | valueType | value | eventTimeStamp |
+-------------+-----------+--------+---------------------------------+
| 1 | null | dvd | 12 |2016-09-19T00:00:00Z
| 1 | rent | dvd | 12 |2016-09-19T00:00:00Z
| 1 | buy | tv | 12 |2016-09-20T00:00:00Z
| 1 | rent | null | 12 |2016-09-20T00:00:00Z
| 1 | buy | movie | 12 |2016-09-18T00:00:00Z
| 1 | null | null | 12 |2016-09-18T00:00:00Z
+-------------+-----------+-------------+--------+---------------------+
ている:これは私にはnullを含む結果を与える
val temp = df.groupBy("CUSTOMER_ID").agg(
collectAsList(df("itemType")).alias("itemCount"),
collectAsList(df("valueType")).alias("valueTypeCount"),
sum("value") as "totalValues")
val stage1 = temp.withColumn("valueTypeMap", count_by_value(col("valueTypeCount")))
.withColumn("itemTypeMap", count_by_value(col("itemCount")))
.drop("itemCount")
.drop("valueTypeCount")
val toMap = udf { (typ: String, count: Int) => Map(typ -> count) }
val count_by_value = udf {(value :scala.collection.mutable.WrappedArray[String]) => if (value == null) null else value.groupBy(identity).mapValues(_.size)}
val collectAsList = new CollectListFunction(StringType)
import org.apache.spark.sql.functions.{dayofmonth, countDistinct}
val stage2 = df.groupBy("CUSTOMER_ID", "itemType")
.agg(countDistinct(dayofmonth(col("eventTimeStamp"))) as "daysPeritemType")
.withColumn("itemTypeForDay", toMap(col("itemType"), col("daysPeritemType")))
.groupBy("CUSTOMER_ID").agg(CombineMaps(col("itemTypeForDay")) as "resultMap")
val result = stage1.join(stage2, stage1("CUSTOMER_ID") === stage2("CUSTOMER_ID"))
.drop(stage2("CUSTOMER_ID"))
:
CUSTOMER_ID : 1
totalValue : 72 --- group by based on id
itemTypeMap : {"rent" : 2, "buy" : 2} --- group by based on id (without null)
valueTypeMap : {"dvd" : 2, "tv" : 1, "movie" : 1 } --- group by based on id
itemTypeForDay : {"rent: 2, "buy" : 2 } --- group By based on id and dayofmonth(col("eventTimeStamp")) atmost 1 type per day
マイコードを。どのようにして集計を実行中にnullを避ける。私は完全にヌル行/列を削除したいとは思わない。特定の行の集計を実行するときにそれらを避けるだけです。
Utilのクラス:
case class Data(totalValue: Long, typeCount: Map[String,Int], typeForDay: Map[String,Int] ,itemCount : Map[String,Int]) extends Serializable
def convertToRDD(result : DataFrame): RDD[(String, String)] = {
val tempFile = result.map({
r => {
val customerId = r.getAs[String]("CUSTOMER_ID")
val totalValue = r.getAs[Long]("totalValue")
val typeCount = r.getAs[Map[String, Int]]("typeCount")
val itemCount = r.getAs[Map[String, Int]]("itemCount")
val typeForDay = r.getAs[Map[String, Int]]("typeForDay")
val features = Data(totalValue, typeCount, typeForDay, itemCount)
val jsonString = JacksonUtil.toJson(features)
(customerId, jsonString)
}
})
return tempFile
}
class CollectListFunction[T] (val colType: DataType) extends UserDefinedAggregateFunction {
def inputSchema: StructType =
new StructType().add("inputCol", colType)
def bufferSchema: StructType =
new StructType().add("outputCol", ArrayType(colType))
def dataType: DataType = ArrayType(colType)
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, new scala.collection.mutable.ArrayBuffer[T])
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val list = buffer.getSeq[T](0)
if (!input.isNullAt(0)) {
val sales = input.getAs[T](0)
buffer.update(0, list:+sales)
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getSeq[T](0) ++ buffer2.getSeq[T](0))
}
def evaluate(buffer: Row): Any = {
buffer.getSeq[T](0)
}
}
object CombineMaps extends UserDefinedAggregateFunction {
override def inputSchema: StructType = new StructType().add("map", dataType)
override def bufferSchema: StructType = inputSchema
override def dataType: DataType = MapType(StringType, IntegerType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0 , Map[String, Int]())
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val before = buffer.getAs[Map[String, Int]](0)
val toAdd = input.getAs[Map[String, Int]](0)
val result = before ++ toAdd
buffer.update(0, result)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)
override def evaluate(buffer: Row): Any = buffer.getAs[Map[String, Int]](0)
}
いくつかのいずれかは、私がここで間違ってやっているものを私に伝えることができますか?