タイムスタンプ列を持つDFがあり、この列でソートされています。これを行う方法はありますか?各レコードについて、次のレコードにアクセスして2つの行の間の時間差を計算しますか? 2つの行が異なるノードで処理される可能性があるため、マップ関数ではこれが可能ではないと私は考えています。Spark Dataframe:マップ関数の次のレコードへのアクセス
ありがとうございました!
タイムスタンプ列を持つDFがあり、この列でソートされています。これを行う方法はありますか?各レコードについて、次のレコードにアクセスして2つの行の間の時間差を計算しますか? 2つの行が異なるノードで処理される可能性があるため、マップ関数ではこれが可能ではないと私は考えています。Spark Dataframe:マップ関数の次のレコードへのアクセス
ありがとうございました!
スパーク1.4の場合か、ハイブコンテキストを使用することができれば、後で、follwingコードはあなたのために働く可能性があります
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql._
val hc = new HiveContext(sc)
val df = hc.read.format("...").load("...")
val timestamp_column = df("timestamp_column")
val next_row_timestamp = lead(timestamp_column, 1).over(Window.orderBy(timestamp_column))
val newDF = df.withColumn("time_difference", next_row_timestamp.cast(LongType) - timestamp_column.cast(LongType))
説明:このコードで
、私はlead(e: Column, offset: Int)
を使用していますfunctions
パッケージ(doc)で利用可能なウィンドウ関数です。この関数は、実際には、(この例ではtimestamp_column
)の列にデータがoffset
(この例では)でシフトした新しい列を作成します。正しく動作させるには、Windowオブジェクトを使用してウィンドウを定義するover(window: WindowSpec)
呼び出しを続ける必要があります。このウィンドウは、パーティションと順序で構成されています。この場合は、Window.orderBy
を使用して注文します。
最後に、を使用して、元のDataFrameとの差を秒単位(またはミリ秒単位でわかりません)で列に追加します。詳細については
、次のリンクを例に、非常によく考えを説明しては: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
編集:コメントで指摘したように
、上記の溶液は、非常にすることができ非効率的な。代替として、それはRDD溶液を使用することが可能です:
val newRDD = df.rdd.zipWithIndex.flatMap {
case (row, idx) => (0 to 1).map { lag => (idx - lag, row) }
}
.groupByKey
.values
.map { pair =>
val pairArray = pair.toArray
val timeDiff = {
if (pairArray.length == 1) null
else pairArray(1).getAs[java.sql.Timestamp]("timestamp_column").getTime - pairArray(0).getAs[java.sql.Timestamp]("timestamp_column").getTime
}
Row.merge(Row(timeDiff), pairArray(0))
}
val newSchema = StructType(StructField("time_diff", LongType, true) +: df.schema.fields)
val newDf = df.sqlContext.createDataFrame(newRDD, newSchema)
newDF
で得られたデータフレームは、現在の行と次のものとの間のミリ秒単位の時間差を含む新しい列「time_diff」を有することになります。
HiveContextを使用できますか?もしそうなら、あなたは窓関数(あなたがスパーク1.4+を持っていると仮定して) –
thxを返答用に使うことができます。私はSpark 1.5を持っていますが、私はWindow関数を使ったことはありません。私はそれが何を参照してくださいします。 – mhaddad
例を使ってレスポンスを追加しました。それが役立つ場合は、正しいものとしてマークしてください。ありがとう! –