2017-11-29 15 views
0

差があるスパークに新しい列として値を一致追加比較します変更した場合、および&のいずれの列に変更がある場合は、この値を列として追加します。このは、二つのデータフレームが誤って2つのレコード間

enter image description here

+0

あなたは**発言を、列のコンテンツを作成する方法を意味するか**?もしそうなら、その列の内容を完全に表示できますか? –

+0

を除き、マイナス演算の一種です。人々が理解してよりよく答えるのを助ける質問にいくつかの例を追加することを提案します。 –

+0

例:古いデータでは、 firstname、lastname、addressが新しいデータと一致しません。備考欄にこの情報を記入してください。
例:古いデータジェームスでは、新しいデータニチン。今は一致していないfirstnameとしてこの情報を取得します。同時に、2番目の列の列を比較します。この情報を同じ列にも入れてください。 –

答えて

1

のような予想される出力は、UDFのデータを比較し、値を返し、両方の列の値(古いものと新しい値)を渡し、後で列とUDFを使用して、プライマリキー上の2つのデータフレームに参加します同じでない場合。

val check = udf ((old_val:String,new_val:String) => if (old_val == new_val) new_val else "") 

df_check= df 
    .withColumn("Check_Name",check(df.col("name"),df.col("new_name"))) 
    .withColumn("Check_Namelast",check(df.col("lastname"),df.col("new_lastname"))) 

はdef機能

  def fn(old_df:Dataframe,new_df:Dataframe) : Dataframe = 
      { 
      val old_df_array = old_df.collect() //make df to array to loop thru 
      val new_df_array = new_df.collect() //make df to array to loop thru 
      var value_change : Array[String] = "" 

      val count = old_df.count 
      val row_count = old_df.coloumn 
      val row_c = row.length 
      val coloumn_name = old_df.coloumn 

      for (i to count) //loop thru all rows 
      { 
      var old = old_df_array.Map(x => x.split(",")) 
      var new = new_df_array.Map(x => x.split(",")) 
      for (j to row_c) //loop thru all coloumn 
      { 
      if(old(j) != new(j)) 
      { 
      value_change = value_change + coloumn_name(j) " has value changed" ///this will add all changes in one full row 
      } 
      //append to array 
      append j(0) //primary key 
      append value_change //Remarks coloumn 
      } 
      } 
      //convert array to df 
      } 
関連する問題