サブグループ内の対応する列の最初の値を取得することは可能ですか?Dataframeは、対応する列の最初と最後の値を取得します。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{Window, WindowSpec}
object tmp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val input = Seq(
(1235, 1, 1101, 0),
(1235, 2, 1102, 0),
(1235, 3, 1103, 1),
(1235, 4, 1104, 1),
(1235, 5, 1105, 0),
(1235, 6, 1106, 0),
(1235, 7, 1107, 1),
(1235, 8, 1108, 1),
(1235, 9, 1109, 1),
(1235, 10, 1110, 0),
(1235, 11, 1111, 0)
).toDF("SERVICE_ID", "COUNTER", "EVENT_ID", "FLAG")
lazy val window: WindowSpec = Window.partitionBy("SERVICE_ID").orderBy("COUNTER")
val firsts = input.withColumn("first_value", first("EVENT_ID", ignoreNulls = true).over(window.rangeBetween(Long.MinValue, Long.MaxValue)))
firsts.orderBy("SERVICE_ID", "COUNTER").show()
}
}
出力が必要です。
最初の(または前の)フラグに基づいて列EVENT_IDの値= FLAGに基づいて、列EVENT_IDの1 と 最終(または次の)値= SERVICE_IDによって1つの パーティションはカウンタによってソート+----------+-------+--------+----+-----------+-----------+
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value|last_value|
+----------+-------+--------+----+-----------+-----------+
| 1235| 1| 1101| 0| 0| 1103|
| 1235| 2| 1102| 0| 0| 1103|
| 1235| 3| 1103| 1| 0| 1106|
| 1235| 4| 1104| 0| 1103| 1106|
| 1235| 5| 1105| 0| 1103| 1106|
| 1235| 6| 1106| 1| 0| 1108|
| 1235| 7| 1107| 0| 1106| 1108|
| 1235| 8| 1108| 1| 0| 1109|
| 1235| 9| 1109| 1| 0| 1110|
| 1235| 10| 1110| 1| 0| 0|
| 1235| 11| 1111| 0| 1110| 0|
| 1235| 12| 1112| 0| 1110| 0|
+----------+-------+--------+----+-----------+-----------+
非常に便利。私は600万行のクラスターでこれを実行することはできません。しかし、その前に私はフラグ(私の元の投稿を編集)のための最後(または次)の値の別の列を追加する必要があります。 – xstack2000
@ xstack2000既に回答を受け取ってからあまり追加しないようにしてください。回答が古くなり、将来の訪問者を混乱させる可能性があります。しかし、あなたが望むカラムを追加しました。ウィンドウの後の行を見る 'lead'関数を使っています。 – Shaido
あなたの答えはとても役に立ちます。私は元の投稿にもっと追加しましたが、あなたの返事に感謝します。他の人がそれを見ることができる場合は、私が追加した連続したレコードに2つのフラグがあるかどうかを処理する方法をまだ必要としています。 しかし私はあなたの答えを私の答えとします。ありがとう。 – xstack2000