2016-10-30 5 views
1

私はPySparkを使ってEratosthenesのSieveを実装しようとしています。PySpark RDDフィルタリングされた要素が戻ってくる

これは私のRDDに多くのfilterを適用しようとしていますが、各繰り返しで、以前の繰り返しでフィルタリングされたものは何度も戻ってきます。ここで

はコードです:

from math import ceil 
from math import sqrt 

min_number = 2 
max_number = 101 

rdd = sc.parallelize(range(min_number, max_number), 4) 
pivot = min_number 

max_pivot = ceil(sqrt(max_number)) 

while pivot <= max_pivot: 
    print "RDD for pivot = " + str(pivot) + ":" 
    rdd = rdd.filter(lambda x: x <= pivot or x % pivot != 0) 
    pivot = rdd.filter(lambda x: x > pivot).reduce(min) 
    rdd.collect() 

そして出力:

Pivot = 2 
[2, 3, 4, 5, 7, 8, 10, 11, 13, 14, 16, 17, 19, 20, 22, 23, 25, 26, 28, 29, 31, 32, 34, 35, 37, 38, 40, 41, 43, 44, 46, 47, 49, 50, 52, 53, 55, 56, 58, 59, 61, 62, 64, 65, 67, 68, 70, 71, 73, 74, 76, 77, 79, 80, 82, 83, 85, 86, 88, 89, 91, 92, 94, 95, 97, 98, 100] 
Pivot = 3 
[2, 3, 4, 5, 6, 7, 9, 10, 11, 13, 14, 15, 17, 18, 19, 21, 22, 23, 25, 26, 27, 29, 30, 31, 33, 34, 35, 37, 38, 39, 41, 42, 43, 45, 46, 47, 49, 50, 51, 53, 54, 55, 57, 58, 59, 61, 62, 63, 65, 66, 67, 69, 70, 71, 73, 74, 75, 77, 78, 79, 81, 82, 83, 85, 86, 87, 89, 90, 91, 93, 94, 95, 97, 98, 99] 
Pivot = 4 
[2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 16, 17, 18, 19, 21, 22, 23, 24, 26, 27, 28, 29, 31, 32, 33, 34, 36, 37, 38, 39, 41, 42, 43, 44, 46, 47, 48, 49, 51, 52, 53, 54, 56, 57, 58, 59, 61, 62, 63, 64, 66, 67, 68, 69, 71, 72, 73, 74, 76, 77, 78, 79, 81, 82, 83, 84, 86, 87, 88, 89, 91, 92, 93, 94, 96, 97, 98, 99] 
Pivot = 5 
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 14, 15, 16, 17, 19, 20, 21, 22, 23, 25, 26, 27, 28, 29, 31, 32, 33, 34, 35, 37, 38, 39, 40, 41, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 55, 56, 57, 58, 59, 61, 62, 63, 64, 65, 67, 68, 69, 70, 71, 73, 74, 75, 76, 77, 79, 80, 81, 82, 83, 85, 86, 87, 88, 89, 91, 92, 93, 94, 95, 97, 98, 99, 100] 
Pivot = 6 
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 15, 16, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27, 29, 30, 31, 32, 33, 34, 36, 37, 38, 39, 40, 41, 43, 44, 45, 46, 47, 48, 50, 51, 52, 53, 54, 55, 57, 58, 59, 60, 61, 62, 64, 65, 66, 67, 68, 69, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 85, 86, 87, 88, 89, 90, 92, 93, 94, 95, 96, 97, 99, 100] 
Pivot = 7 
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 17, 18, 19, 20, 21, 22, 23, 25, 26, 27, 28, 29, 30, 31, 33, 34, 35, 36, 37, 38, 39, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 57, 58, 59, 60, 61, 62, 63, 65, 66, 67, 68, 69, 70, 71, 73, 74, 75, 76, 77, 78, 79, 81, 82, 83, 84, 85, 86, 87, 89, 90, 91, 92, 93, 94, 95, 97, 98, 99, 100] 
Pivot = 8 
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20, 21, 22, 23, 24, 25, 26, 28, 29, 30, 31, 32, 33, 34, 35, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 49, 50, 51, 52, 53, 55, 56, 57, 58, 59, 60, 61, 62, 64, 65, 66, 67, 68, 69, 70, 71, 73, 74, 75, 76, 77, 78, 79, 80, 82, 83, 84, 85, 86, 87, 88, 89, 91, 92, 93, 94, 95, 96, 97, 98, 100] 
Pivot = 9 
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 24, 25, 26, 27, 28, 29, 31, 32, 33, 34, 35, 36, 37, 38, 39, 41, 42, 43, 44, 45, 46, 47, 48, 49, 51, 52, 53, 54, 55, 56, 57, 58, 59, 61, 62, 63, 64, 65, 66, 67, 68, 69, 71, 72, 73, 74, 75, 76, 77, 78, 79, 81, 82, 83, 84, 85, 86, 87, 88, 89, 91, 92, 93, 94, 95, 96, 97, 98, 99] 
Pivot = 10 
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 100] 
Pivot = 11 
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 97, 98, 99, 100] 

あなたが各反復で、見ることができるように、現在のピボットの倍数だけが除外されているが、持っていた数字それぞれの反復でrddの参照を置き換えても、すでにフィルタリングされています。

私はPython 2.7.1をMac用にPySpark 2.0.1を実行しています。

ありがとうございます!

+0

、あなたはピボットを使用して値を除外しようとしていますか? –

+0

@AlbertoBonsantoはい、そうです –

答えて

3

Pythonクロージャは、関数が呼び出されたときに評価され、関数が呼び出されたときに評価されます(late binding)。第1に

(sc.parallelize(range(min_number, max_number), 4) 
    .filter(lambda x: x <= 2 or x % 2 != 0)) 

:最初の反復rddにおける結果

があると評価される第一

(sc.parallelize(range(min_number, max_number), 4) 
    .filter(lambda x: x <= 3 or x % 3 != 0) 
    .filter(lambda x: x <= 3 or x % 3 != 0)) 

(sc.parallelize(range(min_number, max_number), 4) 
    .filter(lambda x: x <= 4 or x % 4 != 0) 
    .filter(lambda x: x <= 4 or x % 4 != 0) 
    .filter(lambda x: x <= 4 or x % 4 != 0)) 

毎回pivotは現在のスコープで解決されます。

正しい実装:だから

while pivot <= max_pivot: 
    def f(x, pivot=pivot): 
     return x <= pivot or x % pivot != 0 

    rdd = rdd.filter(f) 
    pivot = rdd.filter(lambda x: x > pivot).min() 
関連する問題