2017-10-30 13 views
1

サブグループ内の対応する列の最初の値を取得することは可能ですか?Dataframeは、対応する列の最初と最後の値を取得します。

import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.{Window, WindowSpec} 

object tmp { 
    def main(args: Array[String]): Unit = { 
    val spark = SparkSession.builder().master("local").getOrCreate() 
    import spark.implicits._ 

    val input = Seq(
     (1235, 1, 1101, 0), 
     (1235, 2, 1102, 0), 
     (1235, 3, 1103, 1), 
     (1235, 4, 1104, 1), 
     (1235, 5, 1105, 0), 
     (1235, 6, 1106, 0), 
     (1235, 7, 1107, 1), 
     (1235, 8, 1108, 1), 
     (1235, 9, 1109, 1), 
     (1235, 10, 1110, 0), 
     (1235, 11, 1111, 0) 
    ).toDF("SERVICE_ID", "COUNTER", "EVENT_ID", "FLAG") 

    lazy val window: WindowSpec = Window.partitionBy("SERVICE_ID").orderBy("COUNTER") 
    val firsts = input.withColumn("first_value", first("EVENT_ID", ignoreNulls = true).over(window.rangeBetween(Long.MinValue, Long.MaxValue))) 
    firsts.orderBy("SERVICE_ID", "COUNTER").show() 

    } 
} 

出力が必要です。

最初の(または前の)フラグに基づいて列EVENT_IDの値= FLAGに基づいて、列EVENT_IDの1 と 最終(または次の)値= SERVICE_IDによって1つの パーティションはカウンタによってソート

+----------+-------+--------+----+-----------+-----------+ 
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value|last_value| 
+----------+-------+--------+----+-----------+-----------+ 
|  1235|  1| 1101| 0|   0|  1103| 
|  1235|  2| 1102| 0|   0|  1103| 
|  1235|  3| 1103| 1|   0|  1106| 
|  1235|  4| 1104| 0|  1103|  1106| 
|  1235|  5| 1105| 0|  1103|  1106| 
|  1235|  6| 1106| 1|   0|  1108| 
|  1235|  7| 1107| 0|  1106|  1108| 
|  1235|  8| 1108| 1|   0|  1109| 
|  1235|  9| 1109| 1|   0|  1110| 
|  1235|  10| 1110| 1|   0|   0| 
|  1235|  11| 1111| 0|  1110|   0| 
|  1235|  12| 1112| 0|  1110|   0| 
+----------+-------+--------+----+-----------+-----------+ 

答えて

0

ファーストデータフレームをグループ化する必要があります。新しいグループは、「TIME」の欄は、データフレームに列「ID」を追加最初に、これを行うには1に等しい。各開始時刻:

lazy val window: WindowSpec = Window.partitionBy("SERVICE_ID").orderBy("COUNTER") 
val df_flag = input.filter($"FLAG" === 1) 
    .withColumn("ID", row_number().over(window)) 
val df_other = input.filter($"FLAG" =!= 1) 
    .withColumn("ID", lit(0)) 

// Create a group for each flag event 
val df = df_flag.union(df_other) 
    .withColumn("ID", max("ID").over(window.rowsBetween(Long.MinValue, 0))) 
    .cache() 

df.show()います:

+----------+-------+--------+----+---+ 
|SERVICE_ID|COUNTER|EVENT_ID|FLAG| ID| 
+----------+-------+--------+----+---+ 
|  1235|  1| 1111| 1| 1| 
|  1235|  2| 1112| 0| 1| 
|  1235|  3| 1114| 0| 1| 
|  1235|  4| 2221| 1| 2| 
|  1235|  5| 2225| 0| 2| 
|  1235|  6| 2226| 0| 2| 
|  1235|  7| 2227| 1| 3| 
+----------+-------+--------+----+---+ 

を今、我々イベントを分ける列がある場合は、各イベントに正しい "EVENT_ID"(名前が "first_value"に変更されている)を追加する必要があります。 「first_value」に加えて、次のフラグ付きイベントのIDである第2列「last_value」を計算して追加します。

val df_event = df.filter($"FLAG" === 1) 
    .select("EVENT_ID", "ID", "SERVICE_ID", "COUNTER") 
    .withColumnRenamed("EVENT_ID", "first_value") 
    .withColumn("last_value", lead($"first_value",1,0).over(window)) 
    .drop("COUNTER") 

val df_final = df.join(df_event, Seq("ID", "SERVICE_ID")) 
    .drop("ID") 
    .withColumn("first_value", when($"FLAG" === 1, lit(0)).otherwise($"first_value")) 

df_final.show()たちを与える:

+----------+-------+--------+----+-----------+----------+ 
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value|last_value| 
+----------+-------+--------+----+-----------+----------+ 
|  1235|  1| 1111| 1|   0|  2221| 
|  1235|  2| 1112| 0|  1111|  2221| 
|  1235|  3| 1114| 0|  1111|  2221| 
|  1235|  4| 2221| 1|   0|  2227| 
|  1235|  5| 2225| 0|  2221|  2227| 
|  1235|  6| 2226| 0|  2221|  2227| 
|  1235|  7| 2227| 1|   0|   0| 
+----------+-------+--------+----+-----------+----------+ 
+0

非常に便利。私は600万行のクラスターでこれを実行することはできません。しかし、その前に私はフラグ(私の元の投稿を編集)のための最後(または次)の値の別の列を追加する必要があります。 – xstack2000

+0

@ xstack2000既に回答を受け取ってからあまり追加しないようにしてください。回答が古くなり、将来の訪問者を混乱させる可能性があります。しかし、あなたが望むカラムを追加しました。ウィンドウの後の行を見る 'lead'関数を使っています。 – Shaido

+0

あなたの答えはとても役に立ちます。私は元の投稿にもっと追加しましたが、あなたの返事に感謝します。他の人がそれを見ることができる場合は、私が追加した連続したレコードに2つのフラグがあるかどうかを処理する方法をまだ必要としています。 しかし私はあなたの答えを私の答えとします。ありがとう。 – xstack2000

0

は二段階で解決することができます:

  1. は、このイベントのために "FLAG" == 1と有効範囲のイベントを取得します。
  2. は、入力と入力、範囲で結合します。いくつかの列の名前の変更は、視認性のために含ま

は、短縮することができます。

val window = Window.partitionBy("SERVICE_ID").orderBy("COUNTER").rowsBetween(Window.currentRow, 1) 
val eventRangeDF = input.where($"FLAG" === 1) 
    .withColumn("RANGE_END", max($"COUNTER").over(window)) 
    .withColumnRenamed("COUNTER", "RANGE_START") 
    .select("SERVICE_ID", "EVENT_ID", "RANGE_START", "RANGE_END") 
eventRangeDF.show(false) 

val result = input.where($"FLAG" === 0).as("i").join(eventRangeDF.as("e"), 
    expr("e.SERVICE_ID=i.SERVICE_ID And i.COUNTER>e.RANGE_START and i.COUNTER<e.RANGE_END")) 
    .select($"i.SERVICE_ID", $"i.COUNTER", $"i.EVENT_ID", $"i.FLAG", $"e.EVENT_ID".alias("first_value")) 
    // include FLAG=1 
    .union(input.where($"FLAG" === 1).select($"SERVICE_ID", $"COUNTER", $"EVENT_ID", $"FLAG", lit(0).alias("first_value"))) 

result.sort("COUNTER").show(false) 

出力:

+----------+--------+-----------+---------+ 
|SERVICE_ID|EVENT_ID|RANGE_START|RANGE_END| 
+----------+--------+-----------+---------+ 
|1235  |1111 |1   |4  | 
|1235  |2221 |4   |7  | 
|1235  |2227 |7   |7  | 
+----------+--------+-----------+---------+ 

+----------+-------+--------+----+-----------+ 
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value| 
+----------+-------+--------+----+-----------+ 
|1235  |1  |1111 |1 |0   | 
|1235  |2  |1112 |0 |1111  | 
|1235  |3  |1114 |0 |1111  | 
|1235  |4  |2221 |1 |0   | 
|1235  |5  |2225 |0 |2221  | 
|1235  |6  |2226 |0 |2221  | 
|1235  |7  |2227 |1 |0   | 
+----------+-------+--------+----+-----------+ 
+0

Pashaありがとうございます。私はクラスターに対してこれを測定し、応答を返さなければならないと思う。ところで、私は元の投稿を編集しました。それをもっと増やして申し訳ありません。 – xstack2000

関連する問題