2017-03-15 11 views
0

私はそれらを一緒に結合する必要がある2つのrddを持っています。 pysparkは特定のキーでrddsを結合します

[(u'1', u'2'), (u'1', u'3')] 

RDD2

RDD1

[(u'2', u'100', 2), 
(u'1', u'300', 1), 
(u'1', u'200', 1)] 

を私の所望の出力は次のとおりです:彼らは、以下のようになり

[(u'1', u'2', u'100', 2)] 

だから私が持っているRDD2からのものを選択したいと思いますRDD1の同じ第2の値。私は参加しようとしましたが、デカルトとは何も働いていないし、私が探しているものに近くなっていません。私はスパークには新しく、あなたからの助けに感謝します。

ありがとうございます。

+0

を行うためにjoinを使用するキーを圧縮しますか? – titipata

答えて

2

あなたのプロセスは手動のようです。ここではサンプルコードです: -

rdd = sc.parallelize([(u'2', u'100', 2),(u'1', u'300', 1),(u'1', u'200', 1)]) 
rdd1 = sc.parallelize([(u'1', u'2'), (u'1', u'3')]) 
newRdd = rdd1.map(lambda x:(x[1],x[0])).join(rdd.map(lambda x:(x[0],(x[1],x[2])))) 
newRdd.map(lambda x:(x[1][0], x[0], x[1][1][0], x[1][1][1])).coalesce(1).collect() 

OUTPUT: -

[(u'1', u'2', u'100', 2)] 
2

DATAFRAMEあなたは、溶液中のスパークのデータフレームを使用して許可した場合。与えられたRDDをデータフレームに変換して、対応する列に結合することができます。

df1 = spark.createDataFrame(rdd1, schema=['a', 'b', 'c']) 
df2 = spark.createDataFrame(rdd2, schema=['d', 'a']) 
rdd_join = df1.join(df2, on='a') 
out = rdd_join.rdd.collect() 

RDDはちょうどあなたが最初の要素に参加して、単にあなたがこのソリューションのSparkデータフレームを使用することができます参加

rdd1_zip = rdd1.map(lambda x: (x[0], (x[1], x[2]))) 
rdd2_zip = rdd2.map(lambda x: (x[1], x[0])) 
rdd_join = rdd1_zip.join(rdd2_zip) 
rdd_out = rdd_join.map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1])).collect() # flatten the rdd 
print(rdd_out) 
+0

これもうまく見えますが、私はデータフレームではなくrddsオペレーションを使用したソリューションを望んでいました。 – dagg3r

+0

@ dagg3r間違いなく、私はRDDを使って解決できれば回答を更新します! – titipata

関連する問題