私は3つのデータフレームDF1、DF2、DF3を持っています。 DF1、DF2、DF3の中の特定の列の値をチェックし、最後にデータフレームDF1に新しい列を追加する必要があります。Spark Dataframes - 条件に基づいてSparkで3つのデータフレームを結合し、最初のデータフレームに新しい列を追加する方法?
Conditions are:
1. If DF1B and DF2A are equal AND
2. If DF1C and DF3A are equal
then a new column should be added to the first Dataframe DF1. No rows should be missed from Dataframe1.
DF1:
DF1A DF1B DF1C
1 AA XY1
2 BB XY2
3 CC XY3
DF2:
DF2A DF2B
AA Foo
BB Bar
CC Foo1
DF3:
DF3A DF3B DF3C
XY1 Hello LastCol1
XY2 World LastCol2
XY3 Welcome LastCol3
予想される出力:
私が試してみましたマイコード:
val join1 = udm0to35.join(TSCdf,
udm0to35("TXN_SOURCE_APPLICATION") <=> (TSCdf.col("Application_System")) &&
udm0to35("TXN_STATUS") <=> (TSCdf.col("Transaction_Status")), "inner")
val join2 = join1.join(CurrCutofdf,
(join1("COUNTRY") <=> (CurrCutofdf.col("Country")) &&
join1("REMIT_CURRENCY") <=> (CurrCutofdf.col("Currency")) &&
join1("TXN_PRODUCT_TYPE") <=> (CurrCutofdf.col("Product_Type")) &&
(join1("BENE_COUNTRY") <=> (CurrCutofdf.col("Destination_Country")) ||
join1("BENE_COUNTRY") <=> "NULL"), "inner")
val join3 = join2.withColumn("UNIV_STATUS", univstat(join2.col("TXN_POST_DATE"),join2.col("Master_Status")))
注:join1とjoin2ため条件は一緒にされなければなりません。私が参加し、「同じ」状態でそれらを収納することができませんでしたので、しかし、私はそれらを分割されている
何を試しましたか? – eliasah
1インナーDF1およびDF2間の結合==>結果として生じるDFである:RES1 2 INNERはDF1とDF3間の結合==>結果として生じるDFである:RES2 次に(withcolumnを試してみました)。 条件を別々に処理するのではなく、別々に処理しているので、ここでのロジックが間違っていることが分かります。 また、3つのDFの間で結合を達成することができません。 –
質問をあなたが試したコードで更新してください。 – eliasah