2017-09-30 9 views
0

したがって、私は2つのデータフレームを持っています。このような データフレーム1:2つのデータフレームを結合し、1つのデータフレームレコードを別のものに更新する

+----------+------+---------+--------+------+ 
|  OrgId|ItemId|segmentId|Sequence|Action| 
+----------+------+---------+--------+------+ 
|4295877341| 136|  4|  1| I|!|| 
|4295877346| 136|  4|  1| I|!|| 
|4295877341| 138|  2|  1| I|!|| 
|4295877341| 141|  4|  1| I|!|| 
|4295877341| 143|  2|  1| I|!|| 
|4295877341| 145|  14|  1| I|!|| 
| 123456789| 145|  14|  1| I|!|| 
| 809580109| 145|  9|  9| I|!|| 
+----------+------+---------+--------+------+ 

DataFrame2は

+----------+------+-----------+----------+--------+ 
|  OrgId|ItemId|segmentId_1|Sequence_1|Action_1| 
+----------+------+-----------+----------+--------+ 
|4295877343| 149|   15|   2| I|!|| 
|4295877341| 136|  null|  null| I|!|| 
| 123456789| 145|   14|   1| D|!|| 
|4295877341| 138|   11|  22| I|!|| 
|4295877341| 141|   10|   1| I|!|| 
|4295877341| 143|   1|   1| I|!|| 
| 809580109| 145|  NULL|  NULL| I|!|| 
+----------+------+-----------+----------+--------+ 

以下のように今私は、データフレーム2のレコードをマッチングして1列の両方のデータフレームの更新データフレームに参加する必要があります。

両方のデータフレームのキーは、OrgIdとItemIdです。

したがって、期待される出力は次のようになります。

+----------+------+---------+--------+------+ 
|  OrgId|ItemId|segmentId|Sequence|Action| 
+----------+------+---------+--------+------+ 
|4295877346| 136|  4|  1| I|!|| 
|4295877341| 145|  14|  1| I|!|| 
|4295877343| 149|  15|  2| I|!|| 
|4295877341| 136|  null| null| I|!|| 
|4295877341| 138|  11|  22| I|!|| 
|4295877341| 141|  10|  1| I|!|| 
|4295877341| 143|  1|  1| I|!|| 
| 809580109| 145|  9|  9| I|!|| 
+----------+------+---------+--------+------+ 

私はデータフレーム2のレコードでデータフレーム1を更新する必要があります。 データフレーム1のレコードが2で見つからない場合は、そのレコードも保持する必要があります。 、新しいレコードがデータフレーム2で発見された場合、レコードはここで出力

で追加する必要があることを私がやっているものです。..

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer") 
    .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1") 
    .filter(!$"Action_1".contains("D")) 
    df3.show() 

しかし、私は、出力の下に取得しています。

+----------+------+-----------+----------+--------+ 
|  OrgId|ItemId|segmentId_1|Sequence_1|Action_1| 
+----------+------+-----------+----------+--------+ 
|4295877343| 149|   15|   2| I|!|| 
|4295877341| 136|  null|  null| I|!|| 
|4295877341| 138|   11|  22| I|!|| 
|4295877341| 141|   10|   1| I|!|| 
|4295877341| 143|   1|   1| I|!|| 
+----------+------+-----------+----------+--------+ 

私は、データフレーム1から4295877346| 136| 4| 1| I|!|記録...

left_outerは、出力の下に私を与えるを取得していない午前

+----------+------+-----------+----------+--------+ 
|  OrgId|ItemId|segmentId_1|Sequence_1|Action_1| 
+----------+------+-----------+----------+--------+ 
|4295877341| 136|  null|  null| I|!|| 
|4295877341| 138|   11|  22| I|!|| 
|4295877341| 141|   10|   1| I|!|| 
|4295877341| 143|   1|   1| I|!|| 
+----------+------+-----------+----------+--------+ 

答えて

1

最初にあなたの間違いを説明しましょう。

あなただけ

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer") 
df3.show() 

以下のように参加する場合は、

+----------+------+---------+--------+------+-----------+----------+--------+ 
|  OrgId|ItemId|segmentId|Sequence|Action|segmentId_1|Sequence_1|Action_1| 
+----------+------+---------+--------+------+-----------+----------+--------+ 
|4295877346| 136|  4|  1| I|!||  null|  null| null| 
|4295877341| 145|  14|  1| I|!||  null|  null| null| 
|4295877343| 149|  null| null| null|   15|   2| I|!|| 
|4295877341| 136|  4|  1| I|!||  null|  null| I|!|| 
| 123456789| 145|  14|  1| I|!||   14|   1| D|!|| 
|4295877341| 138|  2|  1| I|!||   11|  22| I|!|| 
|4295877341| 141|  4|  1| I|!||   10|   1| I|!|| 
|4295877341| 143|  2|  1| I|!||   1|   1| I|!|| 
+----------+------+---------+--------+------+-----------+----------+--------+ 

を取得しますあなたのコード内のfilterAction_1列に同様

nullをフィルタリングしていることが明らかいっぱいですだから、あなたのために働くコードはの後に得たnullの値を変更することですは、データが存在する他のテーブルのデータを有効にします。

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer") 
    .withColumn("segmentId_1", when($"segmentId_1".isNotNull, $"segmentId_1").otherwise($"segmentId")) 
    .withColumn("Sequence_1", when($"Sequence_1".isNotNull, $"Sequence_1").otherwise($"Sequence")) 
    .withColumn("Action_1", when($"Action_1".isNotNull, $"Action_1").otherwise($"Action")) 
    .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1") 
    .filter(!$"Action_1".contains("D")) 
df3.show() 

あなたは、レコードのわずかな変化がある

+----------+------+-----------+----------+--------+ 
|  OrgId|ItemId|segmentId_1|Sequence_1|Action_1| 
+----------+------+-----------+----------+--------+ 
|4295877346| 136|   4|   1| I|!|| 
|4295877341| 145|   14|   1| I|!|| 
|4295877343| 149|   15|   2| I|!|| 
|4295877341| 136|  null|  null| I|!|| 
|4295877341| 138|   11|  22| I|!|| 
|4295877341| 141|   10|   1| I|!|| 
|4295877341| 143|   1|   1| I|!|| 
+----------+------+-----------+----------+--------+ 
+0

として所望の出力を取得する必要があります...ある特定の列について、DF1の最後のレコードのようにnull値を取得し、DF1の更新がDF2の最後のレコードがnullの場合、その列のDF1値のみを保持する必要があります。 –

+0

申し訳ありません@アナプム、私はあなたの要件を明確に理解していませんでした。もう少し説明できますか? –

+0

したがって、列の値がnullの場合、DF2列の値で更新する必要はありません。文字列としてNULLを返します。 –

0

代わりに、外側の左外側試してみてください:

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "left_outer") 
    .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1") 
    .filter(!$"Action_1".contains("D")) 
    df3.show() 

左のアウターは、左にマッチしないものをすべて保持する必要があります。

素敵なチュートリアルhere

関連する問題