2017-11-30 5 views
0

新しいDataset APIを使用してJavaでPageRankのthis exampleを実装しました。古いRDD APIを使用しているサンプルに対して自分のコードをベンチマークすると、自分のコードは186秒かかりますが、ベースラインはわずか109秒しかかかりません。不一致の原因は何ですか? (余談:?それはデータベースが唯一のエントリの一握りが含まれている場合でも、数百秒を取るためにスパークのため正常です)このPageRankジョブは、RDDよりもデータセットを使用する方がずっと遅いのはなぜですか?

マイコード:

Dataset<Row> outLinks = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "storagepage_outlinks", props); 
Dataset<Row> page = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "pages", props); 

outLinks = page.join(outLinks, page.col("id").equalTo(outLinks.col("storagepage_id"))); 
outLinks = outLinks.distinct().groupBy(outLinks.col("url")).agg(collect_set("outlinks")).cache(); 

Dataset<Row> ranks = outLinks.map(row -> new Tuple2<>(row.getString(0), 1.0), Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "rank"); 

for (int i = 0; i < iterations; i++) { 
    Dataset<Row> joined = outLinks.join(ranks, new Set.Set1<>("url").toSeq()); 
    Dataset<Row> contribs = joined.flatMap(row -> { 
     List<String> links = row.getList(1); 
     double rank = row.getDouble(2); 
     return links 
       .stream() 
       .map(s -> new Tuple2<>(s, rank/links.size())) 
       .collect(Collectors.toList()).iterator(); 
    }, Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "num"); 

    Dataset<Tuple2<String, Double>> reducedByKey = 
      contribs.groupByKey(r -> r.getString(0), Encoders.STRING()) 
      .mapGroups((s, iterator) -> { 
       double sum = 0; 
       while (iterator.hasNext()) { 
        sum += iterator.next().getDouble(1); 
       } 
       return new Tuple2<>(s, sum); 
      }, Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())); 
    ranks = reducedByKey.map(t -> new Tuple2<>(t._1, .15 + t._2 * .85), 
      Encoders.tuple(Encoders.STRING(), Encoders.DOUBLE())).toDF("url", "rank"); 
} 
ranks.show(); 

RDDを使用するサンプルコード(に適応私のデータベースから読み取る):

Dataset<Row> outLinks = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "storagepage_outlinks", props); 
Dataset<Row> page = spark.read().jdbc("jdbc:postgresql://127.0.0.1:5432/postgres", "pages", props); 

outLinks = page.join(outLinks, page.col("id").equalTo(outLinks.col("storagepage_id"))); 
outLinks = outLinks.distinct().groupBy(outLinks.col("url")).agg(collect_set("outlinks")).cache(); // TODO: play with this cache 
JavaPairRDD<String, Iterable<String>> links = outLinks.javaRDD().mapToPair(row -> new Tuple2<>(row.getString(0), row.getList(1))); 

// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. 
JavaPairRDD<String, Double> ranks = links.mapValues(rs -> 1.0); 

// Calculates and updates URL ranks continuously using PageRank algorithm. 
for (int current = 0; current < 20; current++) { 
    // Calculates URL contributions to the rank of other URLs. 
    JavaPairRDD<String, Double> contribs = links.join(ranks).values() 
      .flatMapToPair(s -> { 
       int urlCount = size(s._1()); 
       List<Tuple2<String, Double>> results = new ArrayList<>(); 
       for (String n : s._1) { 
        results.add(new Tuple2<>(n, s._2()/urlCount)); 
       } 
       return results.iterator(); 
      }); 

    // Re-calculates URL ranks based on neighbor contributions. 
    ranks = contribs.reduceByKey((x, y) -> x + y).mapValues(sum -> 0.15 + sum * 0.85); 
} 

// Collects all URL ranks and dump them to console. 
List<Tuple2<String, Double>> output = ranks.collect(); 
for (Tuple2<?,?> tuple : output) { 
    System.out.println(tuple._1() + " has rank: " + tuple._2() + "."); 
} 
+0

データセットのパフォーマンスが低下することが知られています。代わりにDataFrameを使用してみましたか? – LuckyGuess

答えて

1

TL; DRそれはおそらく、古き良きAvoid GroupByKeyものです。ハード

は確かに言うことが、あなたの Datasetコードが groupByKeyのと同等です。

groupByKey(...).mapGroups(...) 

それは最初にシャッフルすることを意味し、そのデータを低減します。

あなたのRDDはreduceByKeyを使用しています - これは、ローカル削減を適用してシャッフルサイズを減らすはずです。このコードを幾分同等にしたい場合は、groupByKey(...).mapGroups(...)groupByKey(...).reduceGroups(...)に書き換える必要があります。

もう1つの可能な候補は構成です。 spark.sql.shuffle.partitionsのデフォルト値は200で、Datasetアグリゲーションに使用されます。場合

データベースにはいくつかのエントリが含まれていますか?

これは重大な過度の攻撃です。

RDDは、親データに基づいてspark.default.parallelismまたは値を使用します(通常ははるかに控えめです)。

+0

'mapGroups'を' reduceGroups'に置き換えますか? – user8371915

関連する問題