2017-12-03 13 views
0

I持つ二つのスパークデータフレームのを、df1df2スパークSQLデータフレームAPI -buildフィルタ条件を動的

私は列の数に基づいて、 df1から 非一致レコードを取得する必要があります
+-------+-----+---+ 
| name|empNo|age| 
+-------+-----+---+ 
|shankar|12121| 28| 
| ramesh| 1212| 29| 
| suresh| 1111| 30| 
| aarush| 0707| 15| 
+-------+-----+---+ 

+------+-----+---+-----+ 
| eName| eNo|age| city| 
+------+-----+---+-----+ 
|aarush|12121| 15|malmo| 
|ramesh| 1212| 29|malmo| 
+------+-----+---+-----+ 

別のファイルで指定されています。たとえば、列のルックアップファイルは、以下のようなものです

df1col,df2col 
name,eName 
empNo, eNo 

期待出力は次のとおりです。

+-------+-----+---+ 
| name|empNo|age| 
+-------+-----+---+ 
|shankar|12121| 28| 
| suresh| 1111| 30| 
| aarush| 0707| 15| 
+-------+-----+---+ 

考え方は上記のために動的にWHERE条件を構築する方法でありますシナリオは、ルックアップファイルが設定可能であるため、1〜n個のフィールドを持つ可能性があります。

+0

をなぜ期待される出力のaarushはありますか?雇用者番号は2つのデータフレームで異なります。 – Shaido

+0

@Shaido:訂正してくれてありがとう、私は質問を更新しました。私はdf1から不一致のレコードを取得する必要があります。 – Shankar

+0

@ツァハゾハール:あなたはこれにいくつかの光を投げることができますか? – Shankar

答えて

1

exceptデータフレーム方式を使用できます。私は、使用する列が単純化のために2つのリストに入っていると仮定しています。両方のリストの順序が正しいことが必要です。列名に関係なく、リスト内の同じ場所の列が比較されます。 exceptの後に、joinを使用して、最初のデータフレームから欠落した列を取得します。

val df1 = Seq(("shankar","12121",28),("ramesh","1212",29),("suresh","1111",30),("aarush","0707",15)) 
    .toDF("name", "empNo", "age") 
val df2 = Seq(("aarush", "12121", 15, "malmo"),("ramesh", "1212", 29, "malmo")) 
    .toDF("eName", "eNo", "age", "city") 

val df1Cols = List("name", "empNo") 
val df2Cols = List("eName", "eNo") 

val tempDf = df1.select(df1Cols.head, df1Cols.tail: _*) 
    .except(df2.select(df2Cols.head, df2Cols.tail: _*))  
val df = df1.join(broadcast(tempDf), df1Cols) 

結果のデータフレームが欲しかったようになります。

+-------+-----+---+ 
| name|empNo|age| 
+-------+-----+---+ 
| aarush| 0707| 15| 
| suresh| 1111| 30| 
|shankar|12121| 28| 
+-------+-----+---+ 
+1

これは本当に便利です。考えられるすべてのシナリオを確認していただき、ありがとうございます。 – Shankar

+0

データセットが巨大な場合はどうすればいいですか...効率的なやり方ですか、それとも結合自体で何かできるのでしょうか? – Shankar

+1

@Shankarこの問題のために、私はいくつかのタイプの 'join'がいつも必要であると信じていますので、これよりずっと効率的ではありません。しかし、 'tempDf'は常に' df1'と同じかそれより小さいので、joinの位置を変更し、 'tempDf'の' broadcast'を追加しました。これは、データフレームのブロードキャストは、ほとんどの場合、効率を高める良いアイデアになることをSparkに示唆します。 – Shaido

0

これをSQLクエリから実行している場合は、SQLクエリ自体の列名をChanging a SQL column title via queryのように再マップします。クエリで単純なテキスト置換を実行して、df1またはdf2列名に正規化することができます。

あなたは差分(例えば年齢)で使用されない複数の列が必要な場合は、 How to obtain the difference between two DataFrames?

のようなものを使用してdiffをすることができ、あなたの差分結果に基づいてデータを再度選択し直すことができたら。これは最適な方法ではないかもしれませんが、うまくいくでしょう。

関連する問題