2017-12-30 15 views
0

私はUbuntuでjupyterを使っています。マルチキーとシングル値のpysparkでreduceByKeyを使うにはどうすればいいですか?

だから私は、次の問題を抱えている、これは私のコードです:

from pyspark import SparkContext 
sc = SparkContext.getOrCreate() 
ut = sc.textFile("hdfs://localhost:54310/hduser/firstnames") 
rows= ut.map(lambda line: line.split(";")) 
res = rows.filter(lamda row: row[2] >= "2000" and row[2] <= "2004") 
res = res.map(lambda row: ({row[1],row[2]},int(row[3]))) 

出力:

[({'2001', 'Brussel'}, 113), 
({'2001', 'Vlaanderen'}, 16), 
({'2002', 'Brussel'}, 12)] 

I:

[({'2001', 'Brussel'}, 9), 
({'2001', 'Brussel'}, 104), 
({'2001', 'Vlaanderen'}, 16), 
({'2002', 'Brussel'}, 12), ...] 

私は私の出力は次のようにする必要があります以前はreduceByKeyでいくつか試してみましたが、 はreduceByKey、buについて多くの質問を受けましたそれを理解できなかった。前もって感謝します。

答えて

0

A list as a key for PySpark's reduceByKeyzero323で説明したように、キーはハッシュ方式を実装する必要があります。あなたはtuplesを使用することができます。

>>> from operator import add 
... 
... sc.parallelize([ 
...  (('2001', 'Brussel'), 9), (('2001', 'Brussel'), 104), 
...  (('2001', 'Vlaanderen'), 16), (('2002', 'Brussel'), 12) 
... ]).reduceByKey(add).take(2) 
... 
[(('2002', 'Brussel'), 12), (('2001', 'Brussel'), 113)] 

置き換えます

res.map(lambda row: ({row[1],row[2]},int(row[3]))) 

res.map(lambda row: ((row[1], row[2]), int(row[3]))) 

または set frozensetと置き換える:

>>> sc.parallelize([ 
...  (frozenset(['2001', 'Brussel']), 9), (frozenset(['2001', 'Brussel']), 104), 
...  (frozenset(['2001', 'Vlaanderen']), 16), (frozenset(['2002', 'Brussel']), 12) 
... ]).reduceByKey(add).take(2) 

[(frozenset({'2002', 'Brussel'}), 12), (frozenset({'2001', 'Brussel'}), 113)] 
+0

感謝を!今すぐうまくいく! –

関連する問題