2017-12-03 8 views
1

回答が見つかりません。 私にRDDがある場合pyspark:1つのRDDを複数のRDDに分割する

rdd = sc.parallelize([('a', [1,2,3]), ('b',[4,5,6])]) 

各値はリストです。 今、各値はリストの要素の一つであり、キーと一致する場所、それが

sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b'6)]) 

なるように、RDDを分割する方法があります。 私は大まかにその解決策を知っています。私たちは、最初の

a = rdd.collect() 

collect()、その後は

rdd2 = sc.parallelize([x for x in a]) 

としてRDDを再割り当てすることができます。しかしRDDが巨大である場合、collect()は非常に時間がかかります。私たちはそれを規模で考える必要があります。それを行うための分散方法はありますか? (lambda function ..などを使用するような)ありがとう!

答えて

2

それはflatMapのためのタスクです:

lamb = lambda x: [(x[0], v) for v in x[1]] 

lamb(('a', [1,2,3])) 
# [('a', 1), ('a', 2), ('a', 3)] 

flatMap:ここ


ラムダ関数は、元のRDDから一つのキーと値のペアを取り、個々の値にキーをマップ

rdd.flatMap(lambda x: [(x[0], v) for v in x[1]]).collect() 
# [('a', 1), ('a', 2), ('a', 3), ('b', 4), ('b', 5), ('b', 6)] 
この操作を各キー値ペアにマップし、結果をフラット化します。

+1

ありがとうございました!それがまさに私が必要なものです! – Hsiang

関連する問題