2016-11-29 12 views
-1

私は3つのデータフレームDF1、DF2、DF3を持っています。 DF1、DF2、DF3の中の特定の列の値をチェックし、最後にデータフレームDF1に新しい列を追加する必要があります。Spark Dataframes - 条件に基づいてSparkで3つのデータフレームを結合し、最初のデータフレームに新しい列を追加する方法?

Conditions are: 
    1. If DF1B and DF2A are equal AND 
    2. If DF1C and DF3A are equal 
then a new column should be added to the first Dataframe DF1. No rows should be missed from Dataframe1. 

DF1:

DF1A DF1B DF1C 
1   AA XY1 
2   BB XY2 
3   CC XY3 

DF2:

DF2A DF2B 
AA  Foo 
BB  Bar 
CC  Foo1 

DF3:

DF3A DF3B DF3C 
XY1  Hello LastCol1 
XY2  World LastCol2 
XY3  Welcome LastCol3 

予想される出力:

私が試してみました

マイコード:

val join1 = udm0to35.join(TSCdf, 
    udm0to35("TXN_SOURCE_APPLICATION") <=> (TSCdf.col("Application_System")) && 
     udm0to35("TXN_STATUS") <=> (TSCdf.col("Transaction_Status")), "inner") 




    val join2 = join1.join(CurrCutofdf, 
    (join1("COUNTRY") <=> (CurrCutofdf.col("Country")) && 
     join1("REMIT_CURRENCY") <=> (CurrCutofdf.col("Currency")) && 
     join1("TXN_PRODUCT_TYPE") <=> (CurrCutofdf.col("Product_Type")) && 
     (join1("BENE_COUNTRY") <=> (CurrCutofdf.col("Destination_Country")) || 
     join1("BENE_COUNTRY") <=> "NULL"), "inner") 





val join3 = join2.withColumn("UNIV_STATUS", univstat(join2.col("TXN_POST_DATE"),join2.col("Master_Status"))) 

注:join1とjoin2ため条件は一緒にされなければなりません。私が参加し、「同じ」状態でそれらを収納することができませんでしたので、しかし、私はそれらを分割されている

+0

何を試しましたか? – eliasah

+0

1インナーDF1およびDF2間の結合==>結果として生じるDFである:RES1 2 INNERはDF1とDF3間の結合==>結果として生じるDFである:RES2 次に(withcolumnを試してみました)。 条件を別々に処理するのではなく、別々に処理しているので、ここでのロジックが間違っていることが分かります。 また、3つのDFの間で結合を達成することができません。 –

+0

質問をあなたが試したコードで更新してください。 – eliasah

答えて

0

私はこれをしようとするあなたをお勧めします:

val DF12 = DF1.join(DF2, DF1.DF1B == DF2.DF2A, 'left_outer') 
var DF123 = DF12.join(DF3, DF12.DF1C == DF3.DF3A, 'left_outer') 
DF123 = DF123.withColumn("Status", <logic for Status calculation>) 
val Output = DF123.select("DF1A", "DF1B", "DF1C", "Status") 

ロジックは、最初の2左はすべて維持するために加入行うことです列をDF1から選択し、必要なロジックに従って新しい列を追加し、最後に必要な列のみを選択します。

+0

私はすでに指定しました.. 注:join1とjoin2の条件は一緒になってください。しかし、私は "同じ"結合条件でそれらを収容することができなかったので、それらを分割しました –

関連する問題