2016-08-24 8 views
0

SparkとScalaの新機能です。私は膨大な量のデータを持つデータフレームを持っています。スキーマはこのようなものです。データフレーム変換

このdataframe empDF呼び出すことができます:

 id name  emp_id code date 
     1 Andrew D01  C101 2012-06-14 
     2 James  D02  C101 2013-02-26 
     3 James  D02  C102 2013-12-29 
     4 James  D02  C101 2010-09-27 
     5 Andrew D01  C101 2013-10-12 
     6 Andrew D01  C102 2011-10-13 

を私はDataFrame[Row]オブジェクトとしてデータベースからこのデータを読み込みます。

コードC101のレベルが1より大きい場合は、他のコードのレベルを0に設定する必要があります。前のレコードがない場合は、のレベルは1に設定されます。そのレコードより2年以上前のレコードがある場合、レベルは2に設定されます。このステップのデータフレームの後 がこの従業員の古い記録があり、2つの行の間の日付の差が二年以上であるため、第1および第2の行は、レベル2を持っているこの

 id name  emp_id code date   level 
     1 Andrew D01  C101 2012-06-14  2 
     2 James  D02  C101 2013-02-26  2 
     3 James  D02  C102 2012-12-29  0 
     4 James  D02  C101 2010-09-27  1 
     5 Andrew D01  C101 2009-10-12  1 
     6 Andrew D01  C102 2010-10-13  0 

のようになります。レベル1の行は、以前の日付のレコードがなく、レベル0の行が0以外のレベルになっているためです。C101

レベル2の行については、チェックする必要がありますそれが適用される場合、コードC102が昨年の従業員に適用される場合、レベルを3に設定します。そうでない場合、レベルを変更しないでください。また、最終結果のデータフレームでは、コードC101以外の行はすべて削除する必要があります。この従業員は、第二列は昨年内C102を持っているが、昨年内にはC102を持っていないので、最初の行はレベル2を持っていることを

 id name emp_id code date  level 
    1 Andrew D01  C101 2012-06-14 2 
    2 James D02  C101 2013-02-26 3 
    4 James D02  C101 2010-09-27 1 
    5 Andrew 2013 C101 2013-10-12 1 

お知らせ:結果のデータフレームは次のようになります。上記の二つのステップの後

。 データフレームAPIを使用してScalaでこれを行うにはどうすればいいですか?mapflatmapreduceなどの機能はどうですか?

+0

あなたは何を試してみましたか?期待どおりの結果を得ましたか? [sscce](http://www.sscce.org)を参照してください。 –

+0

また、私は英語を整理しようとしましたが、私の解釈が正確であることをもう一度確認してください。将来、* short *文を使用して、あなたが参照していることを明確にしてください。 (「それ」ではなく「レコード」を使用してください。「履歴」の使用も非常に奇妙です。)そして、「試した」コードを表示していなければ、助けを得ることはできません。質問が終了する可能性があります。 –

+0

「Gaudiness」?なぜいいの!私は最も派手である。 –

答えて

2

あなたはwindow functionsを使用することができます。

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 

val window = Window.partitionBy("name").orderBy("date") 
val lagCol = lag(col("date"), 1).over(window) 
val diff = datediff(col("date"), col("previousDate")) 
val level = when(
    col("previousDate").isNull || (diff <= 730), 1 
).otherwise(2) 

val newDF = empDF 
    .where(col("code") === "C101") 
    .withColumn("previousDate", lagCol) 
    .withColumn("level", level) 
    .drop("previousDate") 

newDF.orderBy("id").show 

+---+------+------+----+----------+-----+ 
| id| name|emp_id|code|  date|level| 
+---+------+------+----+----------+-----+ 
| 1|Andrew| D01|C101|2012-06-14| 1| 
| 2|James | D02|C101|2013-02-26| 2| 
| 4|James | D02|C101|2010-09-27| 1| 
+---+------+------+----+----------+-----+ 
+0

@thanx Daniel de Paula私は本当にあなたの答えに感謝します。あなたの答えでnewDFである結果的なデータフレームにもう一つのステップがあります。レベル3としてこの行をマークすると、昨年の従業員にコードC102があるかどうかを確認する必要があります。 –

+0

@AtifShahzad、しかし最初のステップC101とは異なるコードですべての行を削除するので、コードC102の行は存在しません。私は本当に理解していない。 –

+0

最初のステップではC101だけを処理する必要がありますが、最後に処理したときに最初のステップでC102が処理された行にのみ存在するかどうかを確認する必要があります。最初のデータフレームにはC103、C104などがあるかもしれませんが、最後のステップでC102だけを確認する必要があります。 –

0
// Input data 
val df = { 
    import org.apache.spark.sql._ 
    import org.apache.spark.sql.types._ 
    import scala.collection.JavaConverters._ 
    import java.time.LocalDate 

    val simpleSchema = StructType(
     StructField("id", IntegerType) :: 
     StructField("name", StringType) :: 
     StructField("emp_id", StringType) :: 
     StructField("code", StringType) :: 
     StructField("date", DateType) :: Nil) 

    val data = List(
     Row(1, "Andrew", "D01", "C101", java.sql.Date.valueOf(LocalDate.of(2012, 6, 14))), 
     Row(2, "James", "D02", "C101", java.sql.Date.valueOf(LocalDate.of(2013, 2, 26))), 
     Row(3, "James", "D02", "C102", java.sql.Date.valueOf(LocalDate.of(2013, 12, 29))), 
     Row(4, "James", "D02", "C101", java.sql.Date.valueOf(LocalDate.of(2010, 9, 27))) 
    )  

    spark.createDataFrame(data.asJava, simpleSchema) 
} 
df.show() 
// Filter and level calculation. 
val df2 = df.filter(col("code") === "C101"). 
    withColumn("level", when(datediff(col("date"), min(col("date")).over(Window.partitionBy("emp_id"))) >= 365 * 2, 2).otherwise(1)) 
df2.show() 
関連する問題