2016-05-04 4 views
2

タイムスタンプ列を持つDFがあり、この列でソートされています。これを行う方法はありますか?各レコードについて、次のレコードにアクセスして2つの行の間の時間差を計算しますか? 2つの行が異なるノードで処理される可能性があるため、マップ関数ではこれが可能ではないと私は考えています。Spark Dataframe:マップ関数の次のレコードへのアクセス

ありがとうございました!

+0

HiveContextを使用できますか?もしそうなら、あなたは窓関数(あなたがスパーク1.4+を持っていると仮定して) –

+0

thxを返答用に使うことができます。私はSpark 1.5を持っていますが、私はWindow関数を使ったことはありません。私はそれが何を参照してくださいします。 – mhaddad

+0

例を使ってレスポンスを追加しました。それが役立つ場合は、正しいものとしてマークしてください。ありがとう! –

答えて

2

スパーク1.4の場合か、ハイブコンテキストを使用することができれば、後で、follwingコードはあなたのために働く可能性があります

import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.LongType 
import org.apache.spark.sql._ 

val hc = new HiveContext(sc) 
val df = hc.read.format("...").load("...") 

val timestamp_column = df("timestamp_column") 
val next_row_timestamp = lead(timestamp_column, 1).over(Window.orderBy(timestamp_column)) 

val newDF = df.withColumn("time_difference", next_row_timestamp.cast(LongType) - timestamp_column.cast(LongType)) 

説明:このコードで

、私はlead(e: Column, offset: Int)を使用していますfunctionsパッケージ(doc)で利用可能なウィンドウ関数です。この関数は、実際には、(この例ではtimestamp_column)の列にデータがoffset(この例では)でシフトした新しい列を作成します。正しく動作させるには、Windowオブジェクトを使用してウィンドウを定義するover(window: WindowSpec)呼び出しを続ける必要があります。このウィンドウは、パーティションと順序で構成されています。この場合は、Window.orderByを使用して注文します。

最後に、を使用して、元のDataFrameとの差を秒単位(またはミリ秒単位でわかりません)で列に追加します。詳細については

、次のリンクを例に、非常によく考えを説明しては: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html


編集:コメントで指摘したように

、上記の溶液は、非常にすることができ非効率的な。代替として、それはRDD溶液を使用することが可能です:

val newRDD = df.rdd.zipWithIndex.flatMap { 
    case (row, idx) => (0 to 1).map { lag => (idx - lag, row) } 
} 
.groupByKey 
.values 
.map { pair => 
    val pairArray = pair.toArray 
    val timeDiff = { 
    if (pairArray.length == 1) null 
    else pairArray(1).getAs[java.sql.Timestamp]("timestamp_column").getTime - pairArray(0).getAs[java.sql.Timestamp]("timestamp_column").getTime 
    } 
    Row.merge(Row(timeDiff), pairArray(0)) 
} 

val newSchema = StructType(StructField("time_diff", LongType, true) +: df.schema.fields) 
val newDf = df.sqlContext.createDataFrame(newRDD, newSchema) 

newDFで得られたデータフレームは、現在の行と次のものとの間のミリ秒単位の時間差を含む新しい列「time_diff」を有することになります。

+0

完璧にクリア、ありがとうございます。 – mhaddad

+0

これは実際には非常に非効率的で実質的に役に立たない。 – zero323

+0

@ zero323だから何が代わりになるだろうか?現時点では、 'df.rdd.zipWithIndex.map'とそれに続くjoinまたはgroupを考えることができるのは –

関連する問題