2017-04-09 21 views
0

コードは以下の通りです。 コードの振る舞いは、0からnまでの数の累乗を生成することです。PySparkの繰り返し計算

Cは、 - サイズl + 1のすべての可能なスーパーセットをCにおけるセットの列挙とCに戻って格納されている反復lにサイズl

の全てのセットから成るRDDを表します。

いくつかのセットは、この例ではランダムジェネレータの出力で示された条件に基づいて列挙から除外されます。

from pyspark import SparkConf 
from pyspark import SparkContext 

from bitarray import bitarray 
import random 

def setadd(u, i): 
    r = u.copy() 
    r[i] = 1 
    return r 

def stringToBit(u): 
    r = bitarray() 
    r.frombytes(u) 
    return r 

def mapFunc(it): 
    global bdTH 
    global bdN 

    for s in it: 
     s = stringToBit(s[0]) 
     print(s) 
     r = random.randint(1, 10) 
     # elimination criteria 
     if r < bdTH.value: 
      continue 

     xmax = n - 1 
     while not s[xmax]: 
      xmax -= 1 

     for x in xrange(xmax + 1, bdN.value): 
      if s[x]: 
       continue 
      ns = setadd(s, x) 
      yield (ns.tobytes(), 0) 

def main(sc, n): 
    phi = bitarray('0') * n 
    C = [(setadd(phi, x).tobytes(), 0) for x in xrange(n)] 
    print(C) 
    C = sc.parallelize(C) 

    global bdN 
    bdN = sc.broadcast(n) 

    global bdTH 
    bdTH = sc.broadcast(random.randint(1, 10)) 

    l = 1 
    while l <= n: 
     C = C.partitionBy(100)\ 
      .mapPartitions(mapFunc) 

     l += 1 

     if C.count(): 
      print('count: ' + str(C.count())) 
     else: 
      print('count: 0') 

     bdTH = sc.broadcast(random.randint(1, 10)) 


if __name__ == "__main__": 
    conf = SparkConf() 
    conf = conf.setAppName("test") 
    sc = SparkContext(conf = conf) 

    n = 5 
    main(sc, n) 
    sc.stop() 

問題:コードとして 1.二回任意のサブセットを評価しない保証するために確かにあります。ただし、出力は、特定のセットが2回評価されることを示します。
2.変数bdTHbroadcastは、Cが生成され、lの反復処理が行われた後にのみ送信されることが保証されているか、またはSparkがいくつかの最適化を実行できます。

Issue1

答えて

0

スパークRDDSは、キャッシュされたおよび/またはチェックポイントされていない限り、彼らの完全な系譜はRDDがアクセスされるたびに評価され、怠け者だと。あなたはpartitionスパークシャッフルファイルを再利用することができますので、

  • C_i.count()
  • についてC_i+1.count()

について:

あなたがキャッシュされていないためとCは一度C_iを評価する以前のCsスパークに依存しますそれ以外の場合は、最初のRDDまで再帰的に戻ります。

+0

箇条書きリストで言語をクリアすることはできますか? –

関連する問題