2016-08-19 27 views
2

私はこのようなRDDを持っている:sparkでグループ化して追加するには?

{"key1" : "fruit" , "key2" : "US" , "key3" : "1" } 

{"key1" : "fruit" , "key2" : "US" , "key3" : "2" } 

{"key1" : "vegetable" , "key2" : "US" , "key3" : "1" } 

{"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" } 

{"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" } 

私の目標は、KEY2によってKEY1し、グループで 最初のグループに あり、最終的にKEY3を追加します。値のフォーマット:私のコードは次のように始まる

key1   key2  key3 
"fruit"  , "US" , 3 
"vegetable" , "US" , 1 
"fruit"  , "Japan" , 3 
"vegetable" , "Japan" , 3 

、同じよう

は私が

rdd_arm = rdd_arm.map(lambda x: x[1]) 

rdd_armは、上記のキーが含まれ、最終的な結果を期待しています。

次はどこに行くのかわかりません。 誰かが私を助けてくれますか?

答えて

1

のは、あなたのRDDを作成してみましょう:

In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }]) 
In [2]: rdd_arm.collect() 
Out[2]: 
[{'key1': 'fruit', 'key2': 'US', 'key3': '1'}, 
{'key1': 'fruit', 'key2': 'US', 'key3': '2'}, 
{'key1': 'vegetable', 'key2': 'US', 'key3': '1'}, 
{'key1': 'fruit', 'key2': 'Japan', 'key3': '3'}, 
{'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}] 

まず、あなたはkey1key2のペアになり、新しいキーを作成する必要があります。あなたはこのような何かやりたいので、それの値は、key3次のようになります。

:次に

In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])) 

In [4]: new_rdd.collect() 
Out[4]: 
[('fruit, US', '1'), 
('fruit, US', '2'), 
('vegetable, US', '1'), 
('fruit, Japan', '3'), 
('vegetable, Japan', '3')] 

を、我々は単にこのように、reduceByKey()を呼び出すことが、重複しているキーの値を追加したいです

In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b)) 

In [6]: new_rdd.collect() 
Out[6]: 
[('fruit, US', 3), 
('fruit, Japan', '3'), 
('vegetable, US', '1'), 
('vegetable, Japan', '3')] 

とさせていただきました!もちろん


、これはこのように、ワンライナーのようになります。

new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b)) 
+1

こんにちはgsamaras。フォローアップありがとうございます。 –

2

私はそれを自分で解決しました。

複数のキーを含むキーを作成してから追加しなければなりませんでした。

rdd_arm.map(lambda x : x[0] + ", " + x[1] , x[2]).reduceByKey(lambda a,b : a + b) 

下記の質問は有用であった。

How to group by multiple keys in spark?

+0

私はこれが私のために動作しませんでした、私は未定義の名前のエラーを取得し、取得した後だったと言うことを許可私はそれを働かせることができませんでした。その結果、私は新しい回答を投稿しました。私はそれが私の練習を作ったので、しかし、質問をupvoted!ありがとう! – gsamaras

関連する問題