2016-10-09 10 views
0

私は、(Key、(Timestamp、Value))のエントリからなるPair RDDを持っています。タイムスタンプのギャップの発見

データを読み取るとき、エントリはタイムスタンプでソートされるため、RDDの各パーティションはタイムスタンプによって順序付けされる必要があります。私がしたいのは、2つのシーケンシャルタイムスタンプの間の最大のギャップであるすべてのキーを見つけることです。

私は長い間この問題について考えていますが、私は火花が提供する機能を考えればどのように実現できるのか分かりません。私が見ている問題は次のとおりです。シンプルなマップを実行すると注文情報が失われるため、それは可能ではありません。特定のキーのエントリが多すぎるため、groupByKeyが失敗したようにも思えます。これを行うと、私にはjava.io.IOException: No space left on device

どのように対処するかについての助けが得られます。

+1

ソート。それはデータを直線的に通過します。 –

+1

私はおそらくスパークでこれをしないでしょう。本質的に線形パスであるものは、スパークによく合いません。あなたはおそらくDFに変換してウィンドウを使用することができますが、私はそれをやったことはありません。 https://cran.r-project.org/web/packages/dplyr/vignettes/window-functions.html関連性があります –

+0

@TheArchetypalPaulより適した一般的な技術を教えてください – amaik

答えて

2

The Archetypal Paulのように、DataFrameとウィンドウ関数を使用できます。まず必要な輸入:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.lag 

次のデータがDataFrameに変換する必要があります。

val keyTimestampWindow = Window.partitionBy("key").orderBy("timestamp") 

val df = rdd.mapValues(_._1).toDF("key", "timestamp") 

たちは、ウィンドウの定義をする必要がありますlag機能を使用できるようにするには

val withGap = df.withColumn(
    "gap", $"timestamp" - lag("timestamp", 1).over(keyTimestampWindow) 
) 
を選択するために使用できます。

最後にgroupBymaxで:

withGap.groupBy("key").max("gap") 

あなたはキーとタイムスタンプによってソートはThe Archetypal Paulことにより、第2のアドバイスをすることができます後。

キーでスライドすると還元することにより、各キーの最大ギャップを見つけることができます。このように配置されたデータと
val sorted = rdd.mapValues(_._1).sortBy(identity) 

import org.apache.spark.mllib.rdd.RDDFunctions._ 

sorted.sliding(2).collect { 
    case Array((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1) 
}.reduceByKey(Math.max(_, _)) 

同じ考えの別の変形再パーティションとソート最初にする:

val partitionedAndSorted = rdd 
    .mapValues(_._1) 
    .repartitionAndSortWithinPartitions(
    new org.apache.spark.HashPartitioner(rdd.partitions.size) 
) 

このようなデータは、変換することができます

val lagged = partitionedAndSorted.mapPartitions(_.sliding(2).collect { 
    case Seq((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1) 
}, preservesPartitioning=true) 

reduceByKey:キーとタイムスタンプで

lagged.reduceByKey(Math.max(_, _)) 
関連する問題