2017-10-25 9 views
-4

こんにちは私は、別のデータフレームに基づいて更新する必要のあるデータフレームを持っています。ここでは、2番目のデータフレームによって提供される新しい値を取るために、私は、フィールドSALE_PRICEを追加し、第二データフレーム が提供する新しいSALE_PRICEがどのようにこの要求は、事前にユニオン関数に基づいてデータフレームを更新する

val df4 = df3.groupBy("pos_id", "article_id").agg($"pos_id", $"article_id", max("date"), sum("qte"), sum("ca")) 

多くのおかげだろう考慮したい場合、私は、どのように要求は次のようになり

val hist1 = spark.read 
     .format("csv") 
     .option("header", "true") //reading the headers 
     .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv") 
     .withColumn("article_id", 'article_id.cast(LongType)) 
     .withColumn("pos_id", 'pos_id.cast(LongType)) 
     .withColumn("qte", 'qte.cast(LongType)) 
     .withColumn("ca", 'ca.cast(DoubleType)) 

    hist1.show 

    val hist2 = spark.read 
     .format("csv") 
     .option("header", "true") //reading the headers 
     .load("C:/Users/MHT/Desktop/his2.csv") 
     .withColumn("article_id", 'article_id.cast(LongType)) 
     .withColumn("date", 'date.cast(DateType)) 
     .withColumn("qte", 'qte.cast(LongType)) 
     .withColumn("ca", 'ca.cast(DoubleType)) 

    hist2.show 

    val df3 = hist1.unionAll(hist2) 
    //  
    val df4 = df3.groupBy("pos_id", "article_id").agg($"pos_id", $"article_id", max("date"), sum("qte"), sum("ca")) 
    df4.show 

+------+----------+----------+---+----+----------+ 
|pos_id|article_id|  date|qte| ca|sale_price| 
+------+----------+----------+---+----+----------+ 
|  1|   1|2000-01-07| 3| 3.5|  14.3| 
|  2|   2|2000-01-07| 15|12.0|  13.2| 
|  3|   2|2000-01-07| 4| 1.2|  14.3| 
|  4|   2|2000-01-07| 4| 1.2|  12.3| 
+------+----------+----------+---+----+----------+ 

+------+----------+----------+---+----+----------+ 
|pos_id|article_id|  date|qte| ca|sale_price| 
+------+----------+----------+---+----+----------+ 
|  1|   1|2000-01-08| 3| 3.5|  14.5| 
|  2|   2|2000-01-08| 15|12.0|  20.2| 
|  3|   2|2000-01-08| 4| 1.2|  17.5| 
|  4|   2|2000-01-08| 4| 1.2|  18.2| 
|  5|   3|2000-01-08| 15| 1.2|  11.2| 
|  6|   1|2000-01-08| 2|1.25|  13.5| 
|  6|   2|2000-01-08| 2|1.25|  14.3| 
+------+----------+----------+---+----+----------+ 



    +------+----------+----------+--------+-------+ 
|pos_id|article_id| max(date)|sum(qte)|sum(ca)| 
+------+----------+----------+--------+-------+ 
|  2|   2|2000-01-08|  30| 24.0| 
|  3|   2|2000-01-08|  8| 2.4| 
|  1|   1|2000-01-08|  6| 7.0| 
|  5|   3|2000-01-08|  15| 1.2| 
|  6|   1|2000-01-08|  2| 1.25| 
|  6|   2|2000-01-08|  2| 1.25| 
|  4|   2|2000-01-08|  8| 2.4| 
+------+----------+----------+--------+-------+ 

をしました

あなたがあなたの所望の出力を持っている必要があり、次の

val df4 = df3.groupBy("pos_id", "article_id").agg(max("date"), sum("qte"), sum("ca")).join(hist2.select("pos_id", "article_id", "sale_price"), Seq("pos_id", "article_id")) 

として最後の行にjoinを使用することができます

答えて

0

関連する問題