2016-08-03 3 views
1

私は2つのデータフレームを持ってAとBと言うことができます。は効率的に結合され、他のデータフレームとのデータフレームのデータを結合しません

キー上でBと結合するデータフレームAと結合されなかったレコードからレコードを取得したいと思います。

これは単一のクエリで実行できますか? 同じデータを2回上書きするとパフォーマンスが低下するため、データフレームAのサイズはBよりはるかに大きいです。 データフレームBのサイズは約50Gb〜100GBです。 そのため、私はBを放送できません。

AのデータがBで結合されているかどうかを示す "Yes"または "No"の値を持つ結合列 "結合"を持つ単一のDataframe Cを得ることができます。

Aに重複がある場合はどうなりますか?私はそれらを望んでいません。 私はrecudeByKeyをCのデータフレームの後で行うことを考えていました。その周りの任意の提案?

私はハイブテーブルを使用して、HDFSにORCファイル形式のデータを保存しています。 コードをスカラーで書く。

+0

から列を削除するにはdropの行を追加している私は、キーと記録にBと結合データフレームAからレコードを取得したいです参加していない人もいます(これらはDataframe A自体のレコードです)。 私はBだけではなく、Aからの行を必要とします。Bと結合したAからの行と、Bと結合しなかったものがBで一致するかどうかを列でマークしました。 – grv

答えて

4

はい、あなただけの左外部結合を実行する必要があります。

import sqlContext.implicits._ 

val A = sc.parallelize(List(("id1", 1234),("id1", 1234),("id3", 5678))).toDF("id1", "number") 
val B = sc.parallelize(List(("id1", "Hello"),("id2", "world"))).toDF("id2", "text") 

val joined = udf((id: String) => id match { 
    case null => "No" 
    case _ => "Yes" 
}) 

val C = A 
    .distinct 
    .join(B, 'id1 === 'id2, "left_outer") 
    .withColumn("joined",joined('id2)) 
    .drop('id2) 
    .drop('text) 

これは、このようになります。データフレームC:[id1: string, number: int, joined: string]得られます:私はdistinctを追加した

[id1,1234,Yes] 
[id3,5678,No] 

注意をAに重複を除外し、最後の列にはCが含まれているかどうかを確認してください。

EDIT:OPからの発言の後、私はB.

+0

B.私が興味を持っているスキーマは Aのスキーマにあります。それは "joined"です。 – grv

+0

@grv私の答えがあなたが探していたものであれば、あなたはそれを受け入れられた答えとしてマークできますか?それ以外の場合は、その答えに伴う問題を明確にしてください。 –

+0

私はもう一度助けが必要でした。 キーに基づいてテーブルBにないレコードをテーブルAに入れたい場合はどうすればいいですか? それを見る他の方法は、レコード全体ではなくキー上のexcept節です。 – grv

関連する問題