私はsparkを学んでおり、以下の問題を解決するための最良の方法を模索したいと考えています。Java Sparkでの重複フィルタリングのための最善のアプローチJavaPairRDD
私は以下のように2つのデータセットusers
とtransactions
を持っており、それらを結合して1アイテムあたりのユニークな場所を見つけることを希望します。ファイルのヘッダーが
id,email,language,location ----------- USER HEADERS
txid,productid,userid,price,desc -------------------- TRANSACTION HEADERS
以下の通りです
は私がこの正しい方法を行って、まだ感じて「行うことができたことを非常に確認していない
/*
* Load user data set into userDataFrame
* Load transaction data set into transactionDataFrame
* join both on user id - userTransactionFrame
* select productid and location columns from the joined dataset into a new dataframe - productIdLocationDataFrame
* convert the new dataframe into a javardd - productIdLocationJavaRDD
* make the javardd a pair rdd - productIdLocationJavaPairRDD
* group the pair rdd by key - productLocationList
* apply mapvalues on the grouped key to convert the list of values to a set of valued for duplicate filtering - productUniqLocations
*
* */
私のアプローチでありますより良い、異なって」。
私はJavaPairRDDから重複フィルタリングを行った部分が疑わしいです。
アプローチとコードを評価し、より良い解決策を教えてください。
コード
SparkConf conf = new SparkConf();
conf.setAppName("Sample App - Uniq Location per item");
JavaSparkContext jsc = new JavaSparkContext("local[*]","A 1");
//JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(jsc);
//id email language location ----------- USER HEADERS
DataFrame userDataFrame = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter", "\t")
.load("user");
//txid pid uid price desc -------------------- TRANSACTION HEADERS
DataFrame transactionDataFrame = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter", "\t")
.load("transactions");
Column joinColumn = userDataFrame.col("id").equalTo(transactionDataFrame.col("uid"));
DataFrame userTransactionFrame = userDataFrame.join(transactionDataFrame,joinColumn,"rightouter");
DataFrame productIdLocationDataFrame = userTransactionFrame.select(userTransactionFrame.col("pid"),userTransactionFrame.col("location"));
JavaRDD<Row> productIdLocationJavaRDD = productIdLocationDataFrame.toJavaRDD();
JavaPairRDD<String, String> productIdLocationJavaPairRDD = productIdLocationJavaRDD.mapToPair(new PairFunction<Row, String, String>() {
public Tuple2<String, String> call(Row inRow) throws Exception {
return new Tuple2(inRow.get(0),inRow.get(1));
}
});
JavaPairRDD<String, Iterable<String>> productLocationList = productIdLocationJavaPairRDD.groupByKey();
JavaPairRDD<String, Iterable<String>> productUniqLocations = productLocationList.mapValues(new Function<Iterable<String>, Iterable<String>>() {
public Iterable<String> call(Iterable<String> inputValues) throws Exception {
return new HashSet<String>((Collection<? extends String>) inputValues);
}
});
productUniqLocations.saveAsTextFile("uniq");
良い部分は、コードが実行されると、私は期待して出力を生成していることです。