2017-06-02 15 views
1

私は2つのRDDを持っています。このように:RDDに対応する要素を別のRDDで圧縮する方法は?

RDD1

scala> val rdd1 = spark.sparkContext.parallelize(List(1,1,2,3,4,4)) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at parallelize at <console>:23 

それは繰り返される値が含まれています。

RDD2

scala> val rdd2 = spark.sparkContext.parallelize(List(1,2,3,4)) 
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[102] at parallelize at <console>:23 

それは、RDD1中に存在するすべての一意の値が含まれています。

scala> rdd1.distinct.coalesce(rdd2.getNumPartitions).zip(rdd2).collect 
res22: Array[(Int, Int)] = Array((4,1), (1,2), (2,3), (3,4)) 

ここでは3142をビュンされるというように:

今、私はこのようなジップオーバーRDD1RDD2を適用しています。私は期待通りの結果を得るために、どのようにしてzip操作を適用すればよいですか?

+0

に運ぶです。あなたが望むものとの違いは何ですか?単に 'map(x =>(x、x))'を使うだけです。 – Shaido

+0

違いは、** RDD1 **と** RDD2 **を組み合わせなければならない点です。要素が一致する必要があります。基本的に、私はシャッフルせずにそれらに参加したいです。それで、私は彼らに 'zip'をやっているのです。 – himanshuIIITian

+0

分散システムでは、各rddのパーティションを1つだけ定義し、1つのパーティションで実行する場合、分散システムの並列処理の利点はありません。 –

答えて

1

あなたの最初のrdddistinctとなるので、valuesが壊れてしまった場合はshuffledとなります。あなたは何ができるか

は私が問題を完全に理解してないと思うあなたのrdd1pair rddを作成し、sortingを行い、残り

val rdd1 = sc.parallelize(List(1,1,2,3,4,4)).map(x => ("a", x)).distinct.sortBy(_._2).values 
val rdd2 = sc.parallelize(List(1,2,3,4)) 
rdd1.coalesce(rdd2.getNumPartitions).zip(rdd2) 
+0

お返事ありがとうございます!しかし、 'sort()'操作を使わずにそれを実行できますか? – himanshuIIITian

+0

'distinct'を使うと' rdd'が 'suffled'になります。 'zip'のために' order'にするには 'sort'が必要です。 –

+0

あなたは正しいです! – himanshuIIITian

関連する問題