のは、あなたのRDDを作成してみましょう:
In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }])
In [2]: rdd_arm.collect()
Out[2]:
[{'key1': 'fruit', 'key2': 'US', 'key3': '1'},
{'key1': 'fruit', 'key2': 'US', 'key3': '2'},
{'key1': 'vegetable', 'key2': 'US', 'key3': '1'},
{'key1': 'fruit', 'key2': 'Japan', 'key3': '3'},
{'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}]
まず、あなたはkey1
とkey2
のペアになり、新しいキーを作成する必要があります。あなたはこのような何かやりたいので、それの値は、key3
次のようになります。
:次に
In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3']))
In [4]: new_rdd.collect()
Out[4]:
[('fruit, US', '1'),
('fruit, US', '2'),
('vegetable, US', '1'),
('fruit, Japan', '3'),
('vegetable, Japan', '3')]
を、我々は単にこのように、reduceByKey()を呼び出すことが、重複しているキーの値を追加したいです
In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b))
In [6]: new_rdd.collect()
Out[6]:
[('fruit, US', 3),
('fruit, Japan', '3'),
('vegetable, US', '1'),
('vegetable, Japan', '3')]
とさせていただきました!もちろん
、これはこのように、ワンライナーのようになります。
new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b))
こんにちはgsamaras。フォローアップありがとうございます。 –