2017-12-22 14 views
0

私は、2つのキーに基づいて結合したい2つのデータフレームを持っています。一致するものがない場合は、キー1だけに基づいて一致を取り込みたい(キー1の複数のレコードがあり、いずれか1つを選択できますが、1つのレコードのみを取り込む必要があります)スパーク2ステップ参加

val df1 = spark.sparkContext.parallelize(List(("k0","k00","v0"), 
    ("k1","k11","v1"),("k2","k22","v2")) 
    ).toDF("key1","key2","val_type_a") 

    val df2 = spark.sparkContext.parallelize(List(("k0","k00","X"), 
    ("k1","XX","Y"),("k1","YY","Z"),("k2","ZZ","W")) 
    ).toDF("key1","key2","val_type_b") 



    val df1_df2=df1.join(df2,Seq("key1","key2"),"left") 
    df1.show 
    df1_df2.show 

Res

しかし、K1、およびK2行のために、私はval_type_bも埋めたいだけのキー1に基づく部分一致が可能のためのK1なので、それはYまたはZとk2のためのいずれかでありますそれはWです。 これを行う最も効率的な方法は?

答えて

1

2つの結合で行うことができます。まず、2つの列を結合し、次に残りの1つの列を結合します。

作業が完了したら、あなたの拳は、あなたが(最初に動作しませんでしたジョインところ)

var df2_single = df2.groupby("key1).agg(first("val_type_b").alias("val_type_b")) 

は、欠損値を選択します(最初の)一つの値だけを保つためにDF2に最初のGROUPBY +を使用することができます参加:

var missing = df1_df2.filter(col("val_types_b").isNull).drop("val_types_2") 

は、その後、あなたの左が再び参加します:

var df1_df2_missing = missing.join(df2_single, "key1", "left") 

、ユニオン最初JOIからの結果nと2番目の参加:

df1_df2 = df1_df2.filter(col("val_types_b").isNotNull).union(df1_df2_missing) 
関連する問題