2017-04-07 23 views
1

カスタム関数でSparkで2つのRDDを結合することは可能ですか? 私は文字列をキーとして2つの大きなRDDを持っています。それが不可能な場合は、スパーク・クラスタの利点を引き続き使用します任意の代替はありカスタム関数で2つのRDDを結合する - SPARK

def my_func(a,b): 
    return Lev.distance(a,b) < 2 

result_rdd = rdd1.join(rdd2, my_func) 

:私は次のように参加古典が、カスタム関数を使用していないそれらを結合したいですか? 私はこのようなことを書いていますが、pysparkは私の小さなクラスターに作品を配布することはできません。

def custom_join(rdd1, rdd2, my_func): 
    a = rdd1.sortByKey().collect() 
    b = rdd2.sortByKey().collect() 
    i = 0 
    j = 0 
    res = [] 
    while i < len(a) and j < len(b): 
     if my_func(a[i][0],b[j][0]): 
      res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))] 
      i+=1 
      j+=1 
     elif a[i][0] < b[j][0]: 
      i+=1 
     else: 
      j+=1 

    return sc.parallelize(res) 

事前のおかげで(と私の英語のため申し訳ありませんが、私はイタリア人だから)

答えて

2

あなたはデカルト使用して、条件に基づいてフィルタリングすることができます。

from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate() 
sc = spark.sparkContext 
x = sc.parallelize([("a", 1), ("b", 4)]) 
y = sc.parallelize([("a", 2), ("b", 3)]) 

def customFunc(x): 
    # You may use any condition here 
    return x[0][0] ==x[1][0] 

print(x.join(y).collect()) # normal join 
# replicating join with cartesian 
print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect()) 

出力:

[('b', (4, 3)), ('a', (1, 2))] 
[('a', (1, 2)), ('b', (4, 3))] 
+0

おかげで、私はデカルト積が参加に比べて非常に非効率的になると思います。私は約2Mのエントリを持つデータベースに取り組んでいます。 –

+0

データフレームAPIを使用することはできますか? – Himaprasoon

+0

データフレームはクラスタ計算と互換性がありますか? –