2017-05-19 15 views
0

と相関列の値がのは、私は3列のデータフレームがあるとしましょう取得:今、集計

itemid, date, price 
1, 2017-05-18, $1.10 
2, 2017-05-18, $2.20 
1, 2017-04-12, $0.90 
1, 2017-03-29, $1.00 

を、私はアイテムIDでグループ化する、最も早い日付を取得し、価格を取得最も早い日付に一致する。 (我々は(アイテムID、日付)が一意であると仮定することができます)

を上記の入力に対する出力は次のようになります。

SQLで
1, 2017-03-29, $1.00 
2, 2017-05-18, $2.20 

、私は自己結合を使用してこれを行うことができます - 最初の選択します各itemidの最小日付を選択し、日付とその最小日付が一致する価格と日付を選択します。

Scala Spark DataFramesでこれをどのように表現できますか? 答えがまだ自己結合に関係する場合、Spark 1.6のDataFrameクエリ実行者は、実際に結合を実現しないほどスマートになっていますか?

答えて

1

一つのアプローチは、次のようなSparkSQL窓関数を使用することです:

import org.apache.spark.sql.expressions.Window 

val df = Seq(
    (1, "2017-05-18", 1.10), 
    (2, "2017-05-18", 2.20), 
    (1, "2017-04-12", 0.90), 
    (1, "2017-03-29", 1.00) 
).toDF(
    "itemid", "date", "price" 
).as[(Integer, String, Double)] 

// Add earliest date by itemid via window function and 
// keep only rows with earliest date by itemid 
val df2 = df.withColumn("earliestDate", min("date").over(
    Window.partitionBy("itemid") 
)). 
    where($"date" === $"earliestDate") 

df2.show 
+------+----------+-----+------------+ 
|itemid|  date|price|earliestDate| 
+------+----------+-----+------------+ 
|  1|2017-03-29| 1.0| 2017-03-29| 
|  2|2017-05-18| 2.2| 2017-05-18| 
+------+----------+-----+------------+ 
+0

は、ソリューションをありがとうございました。これは自己結合アプローチと多少似ていますが、(itemid、date)はすでに保証されたユニークキーであるため、標準集計を使用してIDごとの最小価格を計算してからそれに再参加する。私は、行IDを使用する代わりに、私はユニークなキーを使用することができますし、ウィンドウを使用する代わりに、私はちょうどgroupBy()を使用することができます。 –

+0

@Jon Watte、はい、groupByを使用し、自己(itemid、date)を一意にすることで自己結合し、一意の列を作成する必要はありません。実際、この単純なケースでは、ウィンドウ関数が使用されている場合、自己結合の必要はありません(したがって、一意のROWIDを作成する必要はありません)。私は私の答えを更新しました。 –

+0

明確化と正解をありがとう! –