あなたの要件を見ると、UDAF
aggregation
が最適です。より良い理解のためにdatabricksとragrawalをチェックアウトすることができます。
私が理解するものに応じてあなたにガイダンスを提供していますし、私はそれはあなたがUDAF
を定義するために必要なすべての
まず有用であると思います。上記のリンクを正常に読んだら、それを行うことができます。
private class ManosAggregateFunction(daysToCheck: Int, countsToCheck: Int) extends UserDefinedAggregateFunction {
var referenceDate: String = _
def inputSchema: StructType = new StructType().add("timestamp", StringType).add("count", IntegerType)
// the aggregation buffer can also have multiple values in general but
// this one just has one: the partial sum
def bufferSchema: StructType = new StructType().add("timestamp", StringType).add("count", IntegerType).add("days", IntegerType)
// returns just a double: the sum
def dataType: DataType = BooleanType
// always gets the same result
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, "")
buffer.update(1, 0)
buffer.update(2, 0)
referenceDate = ""
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val nowDate = input.getString(0)
val count = input.getInt(1)
buffer.update(0, nowDate)
buffer.update(1, count)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val previousDate = buffer1.getString(0)
val nowDate = buffer2.getString(0)
if(previousDate != "") {
val oldDate = LocalDate.parse(previousDate, formatter)
val newDate = LocalDate.parse(nowDate, formatter)
buffer1.update(2, buffer1.getInt(2)+(oldDate.toEpochDay() - newDate.toEpochDay()).toInt)
}
buffer1.update(0, buffer2.getString(0))
if(buffer1.getInt(2) < daysToCheck) {
buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))
}
}
def evaluate(buffer: Row): Any = {
countsToCheck <= buffer.getInt(1)
}
}
上記
UDAF
で
、daysToCheck
とcountsToCheck
あなたの質問でX
とY
です。あなたが呼び出すことができます
定義さUDAF
val manosAgg = new ManosAggregateFunction(5,2)
df.orderBy($"timestamp".desc).groupBy("id").agg(manosAgg(col("timestamp"), col("count")).as("code")).show
以下のように最終的な出力は
+---+-----+
| id| code|
+---+-----+
| 1| true|
| 2|false|
+---+-----+
私はあなたがあなたの問題のためのアイデアを持っている願っています入力
val df = Seq(
(1, "2017-06-22", 1),
(1, "2017-06-23", 0),
(1, "2017-06-24", 1),
(2, "2017-06-28", 0),
(2, "2017-06-29", 1)
).toDF("id","timestamp","count")
+---+----------+-----+
|id |timestamp |count|
+---+----------+-----+
|1 |2017-06-22|1 |
|1 |2017-06-23|0 |
|1 |2017-06-24|1 |
|2 |2017-06-28|0 |
|2 |2017-06-29|1 |
+---+----------+-----+
を与えられています。 :)