2017-05-16 12 views
-3

私がスパークする新たなんだと私は他のすべてのRDDSに表示される項目が含まれている最終RDDをフィルタ処理しようとしています。すべてRDDSに表示される項目を取得します - Pyspark

私のコード

a = ['rs1','rs2','rs3','rs4','rs5'] 
b = ['rs3','rs7','rs10','rs4','rs6'] 
c = ['rs10','rs13','rs20','rs16','rs1'] 
d = ['rs2', 'rs4', 'rs5', 'rs13', 'rs3'] 

a_rdd = spark.parallelize(a) 
b_rdd = spark.parallelize(b) 
c_rdd = spark.parallelize(c) 
d_rdd = spark.parallelize(d) 

rdd = spark.union([a_rdd, b_rdd, c_rdd, d_rdd]).distinct() 

結果:[ 'RS4'、 'RS16'、 'RS5'、 'RS6'、 'RS7'、 'RS20'、 'RS1'、 'RS13'、「RS10 」、 'RS2'、 'RS3']

私の予想結果は[ある 'RS3'、 'RS4']

をありがとう!

+0

です。 https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.distinct。内側の結合をチェックしてみてください。私の悪い、それはそのAPIドキュメントのページを見つけられませんでした –

+0

、私はそれに多くの時間を費やします、私はあなたがこのように追加することができますreduce' 'への提案持ってあなたに – pthphap

答えて

1

、あなたが交差点を意味し、すべてのRDDS内の項目が含まれているRDDをしたいと言いますか?その場合は、あなたは、労働組合とあなたのRDDSの交点を使用してはならない空である(どの要素があなたの4 RDDSで繰り返されていない)

いますが、RDDSの交差点を行う必要がある場合:

def intersection(*args): 
     return reduce(lambda x,y:x.intersection(y),args) 

    a = ['rs1','rs2','rs3','rs4','rs5'] 
    b = ['rs3','rs7','rs1','rs2','rs6'] 
    c = ['rs10','rs13','rs2','rs16','rs1'] 
    d = ['rs2', 'rs4', 'rs1', 'rs13', 'rs3'] 

    a_rdd = sc.parallelize(a) 
    b_rdd = sc.parallelize(b) 
    c_rdd = sc.parallelize(c) 
    d_rdd = sc.parallelize(d) 

    rdd = sc.union([a_rdd, b_rdd, c_rdd, d_rdd]).distinct() 
    intersection(a_rdd, b_rdd, c_rdd, d_rdd).collect() 

出力は、私はあなたがドキュメントについての詳細を読むことをお勧め[「RS1」、「RS2」]

+0

ありがとう:'減らす(RDD.intersection、引数を) ' –

+1

はいああそれを行うよりエレガントな方法です:) –

+0

これは魅力のように動作します。ありがとうございました – pthphap

関連する問題