2017-06-23 9 views
0

問題を説明する最も良い方法は、入力の例と出力として出すものを与えることです。Sparkの出現回数をカウントした後にデータを分類する

(数の1秒の合計数は、最後のX日間等しいかYより高い)場合には、ロジックは、のようなものになるだろう入力

-------------------- 
|id|timestamp |count| 
| 1|2017-06-22| 1 | 
| 1|2017-06-23| 0 | 
| 1|2017-06-24| 1 | 
| 2|2017-06-22| 0 | 
| 2|2017-06-23| 1 | 

code = True 

code = False 

たとえば、X = 5Y = 2は、出力は

出力のようになります

入力がある
--------------------- 
id | code | 
1 | True | 
2 | False | 

SparkSQLdataframeorg.apache.spark.sql.DataFrame

は、非常に複雑な問題のような音はありませんが、私は上で立ち往生しています最初の一歩。私はdataframeのデータをロードすることしかできませんでした!

アイデア?

答えて

1

あなたの要件を見ると、UDAFaggregationが最適です。より良い理解のためにdatabricksragrawalをチェックアウトすることができます。

私が理解するものに応じてあなたにガイダンスを提供していますし、私はそれはあなたがUDAFを定義するために必要なすべての

まず有用であると思います。上記のリンクを正常に読んだら、それを行うことができます。

private class ManosAggregateFunction(daysToCheck: Int, countsToCheck: Int) extends UserDefinedAggregateFunction { 

    var referenceDate: String = _ 
    def inputSchema: StructType = new StructType().add("timestamp", StringType).add("count", IntegerType) 
    // the aggregation buffer can also have multiple values in general but 
    // this one just has one: the partial sum 
    def bufferSchema: StructType = new StructType().add("timestamp", StringType).add("count", IntegerType).add("days", IntegerType) 
    // returns just a double: the sum 
    def dataType: DataType = BooleanType 
    // always gets the same result 
    def deterministic: Boolean = true 

    def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer.update(0, "") 
    buffer.update(1, 0) 
    buffer.update(2, 0) 
    referenceDate = "" 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val nowDate = input.getString(0) 
    val count = input.getInt(1) 

    buffer.update(0, nowDate) 
    buffer.update(1, count) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd") 
    val previousDate = buffer1.getString(0) 
    val nowDate = buffer2.getString(0) 
    if(previousDate != "") { 
     val oldDate = LocalDate.parse(previousDate, formatter) 
     val newDate = LocalDate.parse(nowDate, formatter) 
     buffer1.update(2, buffer1.getInt(2)+(oldDate.toEpochDay() - newDate.toEpochDay()).toInt) 
    } 
    buffer1.update(0, buffer2.getString(0)) 
    if(buffer1.getInt(2) < daysToCheck) { 
     buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1)) 
    } 
    } 

    def evaluate(buffer: Row): Any = { 
    countsToCheck <= buffer.getInt(1) 
    } 
} 
上記 UDAF

daysToCheckcountsToCheckあなたの質問でXYです。あなたが呼び出すことができます

定義さUDAF

val manosAgg = new ManosAggregateFunction(5,2) 
    df.orderBy($"timestamp".desc).groupBy("id").agg(manosAgg(col("timestamp"), col("count")).as("code")).show 

以下のように最終的な出力は

+---+-----+ 
| id| code| 
+---+-----+ 
| 1| true| 
| 2|false| 
+---+-----+ 

私はあなたがあなたの問題のためのアイデアを持っている願っています入力

val df = Seq(
    (1, "2017-06-22", 1), 
    (1, "2017-06-23", 0), 
    (1, "2017-06-24", 1), 
    (2, "2017-06-28", 0), 
    (2, "2017-06-29", 1) 
).toDF("id","timestamp","count") 
+---+----------+-----+ 
|id |timestamp |count| 
+---+----------+-----+ 
|1 |2017-06-22|1 | 
|1 |2017-06-23|0 | 
|1 |2017-06-24|1 | 
|2 |2017-06-28|0 | 
|2 |2017-06-29|1 | 
+---+----------+-----+ 

を与えられています。 :)

関連する問題