2016-10-12 24 views
1

2つのRDDをキーで結合する方法を探しています。PySparkで2つのRDDの完全な外部結合を行うには?

は考える:ID、国コードと郵便番号:

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'), 
        ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'), 
        ] 
       ) 

y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'), 
        ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'), 
        ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'), 
        ] 
       ) 

だから私は情報の3種類があります。 RDDの完全な外部結合が必要です。 これは私のコードです:

sorted(x.fullOuterJoin(y, numPartitions = None).collect()) 

そして、これが結果です:

[('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', None)), 
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', 'KlGZj08d')), 
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, 'KNPQLQth')), 
('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, 'JmJCFu3N'))] 

それは郵便番号が加入後に消失したことを奇妙です! 何が間違っている可能性がありますか?

x.union(y).collect() 

います:

[('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)), 
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')), 
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth')), 
('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N'))] 

は、私は他の事やってみました:

私の結果は次のようになり、理想的である必要があり

[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'), 
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'), 
('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'), 
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'), 
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d')] 

をそして私は今、何をしたいですgroupByKeyまたはreduceByKeyです。

これはエラーメッセージを与えるコードである:ただし

sorted(x.union(y).groupByKey().mapValues(list).collect()) 

、一部x.union(Y).groupByKey()が動作するように見えた..

enter image description here

結果を印刷する方法はありますか? (collect()は動作しません) 何か助けに感謝します。どうも !

答えて

1

いくつかの状況で役立つことができコグループがあります:

cogrouped = x.cogroup(y) 

cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect() 
+0

森戸:結果:[( '_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ ='、([]、[ 'JmJCFu3N']))、 ( '_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE ='、 ([ 'FR']、[ 'KlGZj08d']))、 ( '_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4 ='、([ 'TN']、[]))、 ( '、'([]、['KNPQLQth']))] '_guid_hG88Yt5EUsqT8a06Cy380ga​​3XHPwaFylNyuvvqDslCw ='、 – DataAddicted

0

私は解決策を見つけました!それにもかかわらず、このソリューションは私がしたいことに対して完全に満足できるものではありません。

ので:

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'), 
       ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'), 
       ] 
      ) 
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'), 
       ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'), 
       ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'), 
       ] 
      ) 

私は "×" という名前の私のRDDになります私のキーを指定するために機能を作成しました:

def get_keys(rdd): 

    new_x = rdd.map(lambda item: (item[0], (item[1], item[2]))) 
    return new_x 

new_x = get_keys(x) 

与える:

[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001')), 
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160'))] 

次に:

new_x.union(y).map(lambda (x, y): (x, [y])).reduceByKey(lambda p, q : p + q).collect() 

結果:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ['JmJCFu3N']), 
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', [('FR', '75001'), 'KlGZj08d']), 
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', [('TN', '8160')]), 
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ['KNPQLQth'])] 

私は何がしたいことは次のとおりです。

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N')), 
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')), 
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)), 
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth'))] 
関連する問題