2016-07-06 22 views
0

と一致した場合にスパークで2 RDDSをマージするためにどのように私は2 RDDs考えてみましょう:キーに格納された値が

rdd1 = [ (key1, value1), (key2, value2), (key3, value3) ] 

rdd2 = [ (key4, value4), (key5, value5), (key6, value6) ] 

を私はRDDSをマージしたい場合にのみRDD1 =にキー1に格納された値= rdd2のkey5に格納されている値。

JavaまたはScalaを使用してSparkでこれを行う方法を教えてください。

答えて

0

私はあなたがジョインを探していると思います。

まず、key1、key2などをキーとしてPairRDDにマッピングする必要があります。この例では、入力としてTuple2を使用しています:あなたは両方をマッピングしたら

JavaPairRDD<Integer, String> pairRdd = rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() { 
    public Tuple2<Integer, String> call(Tuple2<Integer, String> val) throws Exception { 
     return new Tuple2<Integer, String>(val._1(), val._2()); 
    } 
}); 

、あなただけのキーによってそれらを結合する必要があります。

その後
JavaPairRDD<Integer, Tuple2<String, String>> combined = pairRdd.join(pairRdd2); 

を組み合わせたようなものになります。

[ (key1, (value1, value5)), (key2, (value2, value4)) ] 

どこkey1 == key5とkey2 == key4

0

私はあなたに以下のようにスカラ火花のソリューションを与える

scala> val rdd1 = sc.parallelize(List((3,"s"),(2,"df"),(1,"i"))) 
scala> val rdd2 = sc.parallelize(List((1,"ds"),(2,"h"),(1,"i"))) 
scala> val swaprdd1=rdd1.map(_.swap) 
scala> val swaprdd2=rdd2.map(_.swap) 
scala> val intersectrdd = rdd1.intersection(rdd2) 
scala> val resultrdd = intersectrdd.map(_.swap) 

私はあなたのソリューションのためにその便利を願っています:)

関連する問題