私は2つのRDDを持っています。このように:RDDに対応する要素を別のRDDで圧縮する方法は?
RDD1
scala> val rdd1 = spark.sparkContext.parallelize(List(1,1,2,3,4,4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at parallelize at <console>:23
それは繰り返される値が含まれています。
RDD2
scala> val rdd2 = spark.sparkContext.parallelize(List(1,2,3,4))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[102] at parallelize at <console>:23
それは、RDD1中に存在するすべての一意の値が含まれています。
scala> rdd1.distinct.coalesce(rdd2.getNumPartitions).zip(rdd2).collect
res22: Array[(Int, Int)] = Array((4,1), (1,2), (2,3), (3,4))
ここでは3
で1
と4
、2
をビュンされるというように:
今、私はこのようなジップオーバーRDD1とRDD2を適用しています。私は期待通りの結果を得るために、どのようにしてzip操作を適用すればよいですか?
に運ぶです。あなたが望むものとの違いは何ですか?単に 'map(x =>(x、x))'を使うだけです。 – Shaido
違いは、** RDD1 **と** RDD2 **を組み合わせなければならない点です。要素が一致する必要があります。基本的に、私はシャッフルせずにそれらに参加したいです。それで、私は彼らに 'zip'をやっているのです。 – himanshuIIITian
分散システムでは、各rddのパーティションを1つだけ定義し、1つのパーティションで実行する場合、分散システムの並列処理の利点はありません。 –