2016-06-01 13 views
2

私はsparkを学んでおり、以下の問題を解決するための最良の方法を模索したいと考えています。Java Sparkでの重複フィルタリングのための最善のアプローチJavaPairRDD

私は以下のように2つのデータセットuserstransactionsを持っており、それらを結合して1アイテムあたりのユニークな場所を見つけることを希望します。ファイルのヘッダーが

以下
id,email,language,location ----------- USER HEADERS 
txid,productid,userid,price,desc -------------------- TRANSACTION HEADERS 

以下の通りです

は私がこの正しい方法を行って、まだ感じて「行うことができたことを非常に確認していない

/* 
     * Load user data set into userDataFrame 
     * Load transaction data set into transactionDataFrame 
     * join both on user id - userTransactionFrame 
     * select productid and location columns from the joined dataset into a new dataframe - productIdLocationDataFrame 
     * convert the new dataframe into a javardd - productIdLocationJavaRDD 
     * make the javardd a pair rdd - productIdLocationJavaPairRDD 
     * group the pair rdd by key - productLocationList 
     * apply mapvalues on the grouped key to convert the list of values to a set of valued for duplicate filtering - productUniqLocations 
     * 
     * */ 

私のアプローチでありますより良い、異なって」。

私はJavaPairRDDから重複フィルタリングを行った部分が疑わしいです。

アプローチとコードを評価し、より良い解決策を教えてください。

コード

SparkConf conf = new SparkConf(); 
    conf.setAppName("Sample App - Uniq Location per item"); 
    JavaSparkContext jsc = new JavaSparkContext("local[*]","A 1"); 
    //JavaSparkContext jsc = new JavaSparkContext(conf); 
    SQLContext sqlContext = new SQLContext(jsc); 

    //id email language location ----------- USER HEADERS 
    DataFrame userDataFrame = sqlContext.read() 
      .format("com.databricks.spark.csv") 
      .option("inferSchema", "true") 
      .option("header", "true") 
      .option("delimiter", "\t") 
      .load("user"); 

    //txid pid uid price desc -------------------- TRANSACTION HEADERS 
    DataFrame transactionDataFrame = sqlContext.read() 
      .format("com.databricks.spark.csv") 
      .option("inferSchema", "true") 
      .option("header", "true") 
      .option("delimiter", "\t") 
      .load("transactions"); 

    Column joinColumn = userDataFrame.col("id").equalTo(transactionDataFrame.col("uid")); 

    DataFrame userTransactionFrame = userDataFrame.join(transactionDataFrame,joinColumn,"rightouter"); 

    DataFrame productIdLocationDataFrame = userTransactionFrame.select(userTransactionFrame.col("pid"),userTransactionFrame.col("location")); 

    JavaRDD<Row> productIdLocationJavaRDD = productIdLocationDataFrame.toJavaRDD(); 

    JavaPairRDD<String, String> productIdLocationJavaPairRDD = productIdLocationJavaRDD.mapToPair(new PairFunction<Row, String, String>() { 

     public Tuple2<String, String> call(Row inRow) throws Exception { 
      return new Tuple2(inRow.get(0),inRow.get(1)); 
     } 
    }); 


    JavaPairRDD<String, Iterable<String>> productLocationList = productIdLocationJavaPairRDD.groupByKey(); 

    JavaPairRDD<String, Iterable<String>> productUniqLocations = productLocationList.mapValues(new Function<Iterable<String>, Iterable<String>>() { 

     public Iterable<String> call(Iterable<String> inputValues) throws Exception { 
      return new HashSet<String>((Collection<? extends String>) inputValues); 
     } 
    }); 

    productUniqLocations.saveAsTextFile("uniq"); 

良い部分は、コードが実行されると、私は期待して出力を生成していることです。

答えて

2

getting ridは、groupByKeyである。

aggregateByKeyを使用すると、値の出力タイプが異なります(キーごとにセットが必要です)。 Scalaでは

コード:

pairRDD.aggregateByKey(new java.util.HashSet[String]) 
((locationSet, location) => {locationSet.add(location); locationSet}, 
(locSet1, locSet2) => {locSet1.addAll(locSet2); locSet1} 
) 

のJava等価:

Function2<HashSet<String>, String, HashSet<String>> sequenceFunction = new Function2<HashSet<String>, String, HashSet<String>>() { 

      public HashSet<String> call(HashSet<String> aSet, String arg1) throws Exception { 
       aSet.add(arg1); 
       return aSet; 
      } 
     }; 

     Function2<HashSet<String>, HashSet<String>, HashSet<String>> combineFunc = new Function2<HashSet<String>, HashSet<String>, HashSet<String>>() { 

      public HashSet<String> call(HashSet<String> arg0, HashSet<String> arg1) throws Exception { 
       arg0.addAll(arg1); 
       return arg0; 
      } 
     }; 

     JavaPairRDD<String, HashSet<String>> byKey = productIdLocationJavaPairRDD.aggregateByKey(new HashSet<String>(), sequenceFunction, combineFunc); 


は第二に、データセットが共同分割されたときに最適な動作を結合します。

データフレームを扱っているので、Spark < 1.6を使用している場合、すぐにパーティショニングするのはnot possibleです。したがって、データをRDDに読み込んでパーティション分割し、データフレームを作成することができます。あなたのユースケースでは、データフレームをまったく含まない方が良いかもしれません。

関連する問題