The Archetypal Paulのように、DataFrame
とウィンドウ関数を使用できます。まず必要な輸入:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag
次のデータがDataFrame
に変換する必要があります。
val keyTimestampWindow = Window.partitionBy("key").orderBy("timestamp")
:
val df = rdd.mapValues(_._1).toDF("key", "timestamp")
たちは、ウィンドウの定義をする必要がありますlag
機能を使用できるようにするには
val withGap = df.withColumn(
"gap", $"timestamp" - lag("timestamp", 1).over(keyTimestampWindow)
)
を選択するために使用できます。
最後にgroupBy
max
で:
withGap.groupBy("key").max("gap")
あなたはキーとタイムスタンプによってソートはThe Archetypal Paulことにより、第2のアドバイスをすることができます後。
キーでスライドすると還元することにより、各キーの最大ギャップを見つけることができます。このように配置されたデータと val sorted = rdd.mapValues(_._1).sortBy(identity)
:
import org.apache.spark.mllib.rdd.RDDFunctions._
sorted.sliding(2).collect {
case Array((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1)
}.reduceByKey(Math.max(_, _))
同じ考えの別の変形再パーティションとソート最初にする:
val partitionedAndSorted = rdd
.mapValues(_._1)
.repartitionAndSortWithinPartitions(
new org.apache.spark.HashPartitioner(rdd.partitions.size)
)
を このようなデータは、変換することができます
val lagged = partitionedAndSorted.mapPartitions(_.sliding(2).collect {
case Seq((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1)
}, preservesPartitioning=true)
とreduceByKey
:キーとタイムスタンプで
lagged.reduceByKey(Math.max(_, _))
ソート。それはデータを直線的に通過します。 –
私はおそらくスパークでこれをしないでしょう。本質的に線形パスであるものは、スパークによく合いません。あなたはおそらくDFに変換してウィンドウを使用することができますが、私はそれをやったことはありません。 https://cran.r-project.org/web/packages/dplyr/vignettes/window-functions.html関連性があります –
@TheArchetypalPaulより適した一般的な技術を教えてください – amaik