2016-06-30 6 views
11

RDDを値でソートしようとしています。複数の値が等しい場合は、これらの値を辞書編集でキーする必要があります。まずJavaPairRDDを値で並べ替えてからキーで並べ替えます。

コード:

JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long >() { 

    @Override 
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < String, Long > (t._1, t._2.count); 
    } 
}); 

私がこれまで行ってtakeOrderedを使用してCustomComperatorを提供し、このですが、takeOrderedは大量のデータを扱うことができないので、コードを実行するときに、それが出続けている何を(それは)OSが処理できない大量のメモリを食べる:

List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long >() { 

    @Override 
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < String, Long > (t._1, t._2.count); 
    } 
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP); 

Comperator:

static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable { 
     private static final long serialVersionUID = 1L; 

     private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator(); 

     @Override 
     public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) { 
      if (o1._2.compareTo(o2._2) == 0) { 
       return o1._1.compareTo(o2._1); 
      } 
      return -o1._2.compareTo(o2._2); 
     } 
} 

ERROR:

16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s 

どのようにあなたは、このRDDを並べ替えるでしょうか? TopKMoviesの値を考慮する方法と、等価キーが辞書編集である場合はどうしますか?

ありがとうございました。

+0

あなたは(スタックトレースを提供することができますもしあれば?)。あなたはそれがメモリの問題かもしれないと言いましたが、エラーメッセージは正確に何が起こったかを見ることができません。 – Serhiy

+0

@ Serhiy私は、分散モードで大量のデータを処理しているので、takeOrdered操作には長い時間がかかりますので、メモリの問題だと思います。終了コードがあります。137&Exit code:1 他の方法で並べ替えに近づくと、問題は確実に解決されます。 –

+0

データを再パーティション化しようとしましたか?ペアにマップすると、直後に再パーティションすることができます。 – Serhiy

答えて

3

は、&パーティションコンパレータでsortByKeyを使用して問題を解決し

JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long >() { 

    @Override 
    public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count); 
    } 
}).sortByKey(new TupleMapLongComparator(), true, 100); 


JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() { 

    @Override 
    public Tuple2<String, Long> call(
      Tuple2<Tuple2<String, Long>, Long> t) throws Exception { 
     return new Tuple2 < String, Long > (t._1._1, t._1._2); 
    } 

}); 
PairRDD

< Tuple2<String,Long> , Long><String, Long> PairRDDをmaping後コンパレータ:

private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable { 
    @Override 
    public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) { 

     if (tuple1._2.compareTo(tuple2._2) == 0) { 
      return tuple1._1.compareTo(tuple2._1); 
     } 
     return -tuple1._2.compareTo(tuple2._2); 
    } 
} 
関連する問題