2017-11-16 9 views
0

インスタンスのように:pysparkのjoin関数を使って2つの鍵で2つのrddを結合できますか?

rdd1 = [('magnus', 'nordea', 13000), ('Erik', 'nordea', 13000), ('Ola', 'nordea', 19000), ('rohit', 'nordea', 23030)] 
rdd2 = [('magnus', 'jpmc', 14000), ('Erik', 'jpmc', 2100), ('Ola', 'jpmc', 18400), ('rohit', 'jpmc', 25000)] 


rdd1.join(rdd2).collect() gives me : 
[('Ola', ('nordea', 'jpmc')), ('Erik', ('nordea', 'jpmc')), ('rohit', ('nordea', 'jpmc')), ('magnus', ('nordea', 'jpmc'))] 

上記RDDその量値を失う:(

必要な出力:

[(('Ola','nordea'),13000), 
(('Ola','jpmc'),14000), 
(('Erik','nordea'),13000), 
(('Erik','jpmc'),2100), 
(('rohit','nordea'),23030), 
(('rohit','jpmc'),25000), 
(('magnus','nordea'),13000), 
(('magnus', 'jpmc'),14000)] 

その上の任意の提案や、私が参照すべきかの機能を?

+0

へようこそSO。 – desertnaut

答えて

0
spark.version 
# u'2.2.0' 

rdd1 = sc.parallelize([('magnus', 'nordea', 13000), ('Erik', 'nordea', 13000), ('Ola', 'nordea', 19000), ('rohit', 'nordea', 23030)]) 
rdd2 = sc.parallelize([('magnus', 'jpmc', 14000), ('Erik', 'jpmc', 2100), ('Ola', 'jpmc', 18400), ('rohit', 'jpmc', 25000)]) 

rdd1.union(rdd2).map(lambda x: ((x[0], x[1]), x[2])).collect() 

結果:

[(('magnus', 'nordea'), 13000), 
(('Erik', 'nordea'), 13000), 
(('Ola', 'nordea'), 19000), 
(('rohit', 'nordea'), 23030), 
(('magnus', 'jpmc'), 14000), 
(('Erik', 'jpmc'), 2100), 
(('Ola', 'jpmc'), 18400), 
(('rohit', 'jpmc'), 25000)] 
+0

私はそれをさらにリリートし、私の期待される結果を得ました: nordeardd = rdd1_temp.join(rdd2_temp).map(ラムダx:((x [ 0]、x [1] [0])、x [1] [0] [1])) jpmcrdd = rdd1_temp.join(rdd2_temp).map(ラムダx:((x [0]、x [1] [1] [0])、x [1] [1])) result = nordeardd.union(jpmcrdd) result.collect( [( 'Ola'、 'nordea' )、19000)、(( '' Erik '、' nordea ')、13000)、(('ロヒット '、'ノルデア ')、23030)、((' magnus '、' nordea ' ( '' rohit '、' jpmc ')、25000)、((' magnus '、' jpmc ')、 14000)] これはコードが多く、パフォーマンス上の問題がある可能性があります。我々は –

+0

@ S.Jainを実装することができれば、より良いトリック私の一時的なRDDsと一緒にそれを忘れてください。更新された答えを(そして親切にそれを受け入れる) – desertnaut

0
 rdd1.union(rdd2).collect() 

出力:

[('magnus', 'nordea', 13000), 
    ('Erik', 'nordea', 13000), 
    ('Ola', 'nordea', 19000), 
    ('rohit', 'nordea', 23030), 
    ('magnus', 'jpmc', 14000), 
    ('Erik', 'jpmc', 2100), 
    ('Ola', 'jpmc', 18400), 
    ('rohit', 'jpmc', 25000)] 
関連する問題