RDDを値でソートしようとしています。複数の値が等しい場合は、これらの値を辞書編集でキーする必要があります。まずJavaPairRDDを値で並べ替えてからキーで並べ替えます。
コード:
JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long >() {
@Override
public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
return new Tuple2 < String, Long > (t._1, t._2.count);
}
});
私がこれまで行ってtakeOrdered
を使用してCustomComperator
を提供し、このですが、takeOrdered
は大量のデータを扱うことができないので、コードを実行するときに、それが出続けている何を(それは)OSが処理できない大量のメモリを食べる:
List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long >() {
@Override
public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
return new Tuple2 < String, Long > (t._1, t._2.count);
}
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP);
Comperator:
static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable {
private static final long serialVersionUID = 1L;
private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator();
@Override
public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) {
if (o1._2.compareTo(o2._2) == 0) {
return o1._1.compareTo(o2._1);
}
return -o1._2.compareTo(o2._2);
}
}
ERROR:
16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s
どのようにあなたは、このRDDを並べ替えるでしょうか? TopKMovies
の値を考慮する方法と、等価キーが辞書編集である場合はどうしますか?
ありがとうございました。
あなたは(スタックトレースを提供することができますもしあれば?)。あなたはそれがメモリの問題かもしれないと言いましたが、エラーメッセージは正確に何が起こったかを見ることができません。 – Serhiy
@ Serhiy私は、分散モードで大量のデータを処理しているので、takeOrdered操作には長い時間がかかりますので、メモリの問題だと思います。終了コードがあります。137&Exit code:1 他の方法で並べ替えに近づくと、問題は確実に解決されます。 –
データを再パーティション化しようとしましたか?ペアにマップすると、直後に再パーティションすることができます。 – Serhiy