コードは以下の通りです。 コードの振る舞いは、0
からn
までの数の累乗を生成することです。PySparkの繰り返し計算
C
は、 - サイズl + 1
のすべての可能なスーパーセットをC
におけるセットの列挙とC
に戻って格納されている反復l
にサイズl
の全てのセットから成るRDDを表します。
いくつかのセットは、この例ではランダムジェネレータの出力で示された条件に基づいて列挙から除外されます。
from pyspark import SparkConf
from pyspark import SparkContext
from bitarray import bitarray
import random
def setadd(u, i):
r = u.copy()
r[i] = 1
return r
def stringToBit(u):
r = bitarray()
r.frombytes(u)
return r
def mapFunc(it):
global bdTH
global bdN
for s in it:
s = stringToBit(s[0])
print(s)
r = random.randint(1, 10)
# elimination criteria
if r < bdTH.value:
continue
xmax = n - 1
while not s[xmax]:
xmax -= 1
for x in xrange(xmax + 1, bdN.value):
if s[x]:
continue
ns = setadd(s, x)
yield (ns.tobytes(), 0)
def main(sc, n):
phi = bitarray('0') * n
C = [(setadd(phi, x).tobytes(), 0) for x in xrange(n)]
print(C)
C = sc.parallelize(C)
global bdN
bdN = sc.broadcast(n)
global bdTH
bdTH = sc.broadcast(random.randint(1, 10))
l = 1
while l <= n:
C = C.partitionBy(100)\
.mapPartitions(mapFunc)
l += 1
if C.count():
print('count: ' + str(C.count()))
else:
print('count: 0')
bdTH = sc.broadcast(random.randint(1, 10))
if __name__ == "__main__":
conf = SparkConf()
conf = conf.setAppName("test")
sc = SparkContext(conf = conf)
n = 5
main(sc, n)
sc.stop()
問題:コードとして 1.二回任意のサブセットを評価しない保証するために確かにあります。ただし、出力は、特定のセットが2回評価されることを示します。
2.変数bdTH
のbroadcast
は、C
が生成され、l
の反復処理が行われた後にのみ送信されることが保証されているか、またはSparkがいくつかの最適化を実行できます。
箇条書きリストで言語をクリアすることはできますか? –