2017-08-15 10 views
0

こんにちは私はspark/scalaの新機能ですが、特定の再帰式に基づいてスパークデータフレームの列を作成しようとしています:Spark Scalaの列に累積/反復Costumメソッドを実行する

ここは疑似コードです。ここでは、より詳細にダイビングを

someDf.col2[0] = 0 

for i > 0 
someDf.col2[i] = x * someDf.col1[i-1] + (1-x) * someDf.col2[i-1] 

は私の出発点は次のとおりです。 このデータフレームは、両方dates、個々のid年代のレベルに集計した結果です。

これ以上の計算はその特定のidに関して行われなければならず、前週に何が起こったかを考慮する必要があります。

これを説明するために、値を0と1に単純化し、乗算器x1-xを削除しました。col2もゼロに初期化しました。

var someDf = Seq(("2016-01-10 00:00:00.0","385608",0,0), 
     ("2016-01-17 00:00:00.0","385608",0,0), 
     ("2016-01-24 00:00:00.0","385608",1,0), 
     ("2016-01-31 00:00:00.0","385608",1,0), 
     ("2016-02-07 00:00:00.0","385608",1,0), 
     ("2016-02-14 00:00:00.0","385608",1,0), 
     ("2016-01-17 00:00:00.0","105010",0,0), 
     ("2016-01-24 00:00:00.0","105010",1,0), 
     ("2016-01-31 00:00:00.0","105010",0,0), 
     ("2016-02-07 00:00:00.0","105010",1,0) 
     ).toDF("dates", "id", "col1","col2") 

someDf.show() 

+--------------------+------+----+----+ 
|    dates| id|col1|col2| 
+--------------------+------+----+----+ 
|2016-01-10 00:00:...|385608| 0| 0| 
|2016-01-17 00:00:...|385608| 0| 0| 
|2016-01-24 00:00:...|385608| 1| 0| 
|2016-01-31 00:00:...|385608| 1| 0| 
|2016-02-07 00:00:...|385608| 1| 0| 
|2016-02-14 00:00:...|385608| 1| 0| 
+--------------------+------+----+----+ 
|2016-01-17 00:00:...|105010| 0| 0| 
|2016-01-24 00:00:...|105010| 1| 0| 
|2016-01-31 00:00:...|105010| 0| 0| 
|2016-02-07 00:00:...|105010| 1| 0| 
+--------------------+------+----+----+ 

私は

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 

val date_id_window = Window.partitionBy("id").orderBy(asc("dates")) 

someDf.withColumn("col2", lag($"col1",1).over(date_id_window) + 
lag($"col2",1).over(date_id_window)).show() 


+--------------------+------+----+----+/+--------------------+ 
|    dates| id|col1|col2|/| what_col2_should_be| 
+--------------------+------+----+----+/+--------------------+ 
|2016-01-17 00:00:...|105010| 0|null|/|     0| 
|2016-01-24 00:00:...|105010| 1| 0|/|     0| 
|2016-01-31 00:00:...|105010| 0| 1|/|     1| 
|2016-02-07 00:00:...|105010| 1| 0|/|     1| 
+-------------------------------------+/+--------------------+ 
|2016-01-10 00:00:...|385608| 0|null|/|     0| 
|2016-01-17 00:00:...|385608| 0| 0|/|     0| 
|2016-01-24 00:00:...|385608| 1| 0|/|     0| 
|2016-01-31 00:00:...|385608| 1| 1|/|     1| 
|2016-02-07 00:00:...|385608| 1| 1|/|     2| 
|2016-02-14 00:00:...|385608| 1| 1|/|     3| 
+--------------------+------+----+----+/+--------------------+ 

希望するものを対これまでに試してみましたが、火花データフレームでこれを行うにはそこの方法ですが、私は複数の累積型計算を見て、同じ列を含むことがありません、問題は、行i-1の新しく計算された値は考慮されず、代わりに古いi-1が常に0であるということです。

助けてください。

答えて

0

Datasetがうまく動作するはずです:

val x = 0.1 

case class Record(dates: String, id: String, col1: Int) 

someDf.drop("col2").as[Record].groupByKey(_.id).flatMapGroups((_, records) => { 
    val sorted = records.toSeq.sortBy(_.dates) 
    sorted.scanLeft((null: Record, 0.0)){ 
    case ((_, col2), record) => (record, x * record.col1 + (1 - x) * col2) 
    }.tail 
}).select($"_1.*", $"_2".alias("col2")) 
0

をあなたが使用しているWindow機能をrowsBetween APIを使用することができますし、

+--------------------+------+----+----+ 
|    dates| id|col1|col2| 
+--------------------+------+----+----+ 
|2016-01-10 00:00:...|385608| 0| 0| 
|2016-01-17 00:00:...|385608| 0| 0| 
|2016-01-24 00:00:...|385608| 1| 0| 
|2016-01-31 00:00:...|385608| 1| 0| 
|2016-02-07 00:00:...|385608| 1| 0| 
|2016-02-14 00:00:...|385608| 1| 0| 
|2016-01-17 00:00:...|105010| 0| 0| 
|2016-01-24 00:00:...|105010| 1| 0| 
|2016-01-31 00:00:...|105010| 0| 0| 
|2016-02-07 00:00:...|105010| 1| 0| 
+--------------------+------+----+----+ 
として入力 dataframeを考えると、出力

val date_id_window = Window.partitionBy("id").orderBy(asc("dates")) 
someDf.withColumn("col2", sum(lag($"col1", 1).over(date_id_window)).over(date_id_window.rowsBetween(Long.MinValue, 0))) 
    .withColumn("col2", when($"col2".isNull, lit(0)).otherwise($"col2")) 
    .show() 

を希望している必要があります

ouが必要ですデータフレームTPUT

+--------------------+------+----+----+ 
|    dates| id|col1|col2| 
+--------------------+------+----+----+ 
|2016-01-17 00:00:...|105010| 0| 0| 
|2016-01-24 00:00:...|105010| 1| 0| 
|2016-01-31 00:00:...|105010| 0| 1| 
|2016-02-07 00:00:...|105010| 1| 1| 
|2016-01-10 00:00:...|385608| 0| 0| 
|2016-01-17 00:00:...|385608| 0| 0| 
|2016-01-24 00:00:...|385608| 1| 0| 
|2016-01-31 00:00:...|385608| 1| 1| 
|2016-02-07 00:00:...|385608| 1| 2| 
|2016-02-14 00:00:...|385608| 1| 3| 
+--------------------+------+----+----+ 

としてロジック上で適用した後、私は答えはあなたではなくvarとしてそれを処理するよりも、自分のデータフレームに変換を適用すべき

0

有用であると思います。あなたが欲しいものを得るための一つの方法は、累積的に前の行(すなわち、行-1)を介して、各ウィンドウのパーティション内の行のcol1の値を合計するウィンドウのrowsBetweenを使用することです:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 

val window = Window.partitionBy("id").orderBy("dates").rowsBetween(Long.MinValue, -1) 

val newDF = someDf. 
    withColumn(
    "col2", sum($"col1").over(window) 
).withColumn(
    "col2", when($"col2".isNull, 0).otherwise($"col2") 
).orderBy("id", "dates") 

newDF.show 
+--------------------+------+----+----+ 
|    dates| id|col1|col2| 
+--------------------+------+----+----+ 
|2016-01-17 00:00:...|105010| 0| 0| 
|2016-01-24 00:00:...|105010| 1| 0| 
|2016-01-31 00:00:...|105010| 0| 1| 
|2016-02-07 00:00:...|105010| 1| 1| 
|2016-01-10 00:00:...|385608| 0| 0| 
|2016-01-17 00:00:...|385608| 0| 0| 
|2016-01-24 00:00:...|385608| 1| 0| 
|2016-01-31 00:00:...|385608| 1| 1| 
|2016-02-07 00:00:...|385608| 1| 2| 
|2016-02-14 00:00:...|385608| 1| 3| 
+--------------------+------+----+----+