2017-11-09 15 views
1

同じオブジェクトの2つのJavaRDDがあるので、データを1つにまとめる必要があります。 これらは次のとおりです。スパーク:2つのJavaオブジェクトRDDを1つに結合する

ドメイン

public class User { 
    String name; 
    String email; 
    String profession; 
    Integer age; 

    // constructor 

    // setters and getters 
} 

RDD 1

User user1 = new User ("Name", "[email protected]"); 
User user2 = new User ("Name2", "[email protected]"); 

List<User> userList = new ArrayList<>(); 
userList.add(user1); 
userList.add(user2); 

JavaRDD<User> leftUserJavaRDD = sc.parallelize(userList); 

RDD 2

User user3 = new User ("[email protected]", "Software Engineer", 26); 
User user4 = new User ("[email protected]", "Lawyer", 35); 

List<User> userList2 = new ArrayList<>(); 
userList.add(user3); 
userList.add(user4); 

JavaRDD<User> rightUserJavaRDD = sc.parallelize(userList2); 

私は、一般的な電子メールアドレスを持つ2つのRDDを組み合わせたいです。私は期待したい 組み合わせRDDは次のとおりです。

User user1and3 = new User (
     "Name", 
     "[email protected]", 
     "Software Engineer", 
     26); 

User user2and4 = new User (
     "Name2", 
     "[email protected]", 
     "Lawyer", 
     35); 

は、どのように私はJavaを使用してスパークでこれを行うことができますか? unioncartesianを試しましたが、うまくいきませんでした。

答えて

1

私は同僚の助力を得て、私たちが得た解決策はここにあります。

import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function2; 
import scala.Tuple2; 

import java.util.List; 

public JavaRDD<User> getCombinedUsers(JavaRDD<User> leftUserJavaRDD, JavaRDD<User> rightUserJavaRDD) { 

    JavaPairRDD<String, User> leftUserJavaPairRDD = 
       leftUserJavaRDD.mapToPair(user -> new Tuple2<>(user.getEmail(), user)); 

    JavaPairRDD<String, User> rightUserJavaPairRDD = 
       rightUserJavaRDD.mapToPair(user -> new Tuple2<>(user.getEmail(), user)); 

    return leftUserJavaPairRDD 
       .union(rightUserJavaPairRDD) 
       .reduceByKey(merge).values(); 
} 

/** 
* Reduce Function for merging User with no profession and age information with the one that has profession and age information. 
*/ 
private static Function2<User, User, User> merge = 
      (User left, User right) -> 
        new User(left.getName(), left.getEmail(), right.getProfession(), right.getAge()); 
関連する問題