2017-06-18 18 views
0

これをJavaで、そしてRDD APIを使用するだけで解決する必要があります。Apache Spark(Java)の複数の値でJavaRDDタプルを並べ替えます。

私は4つの値を持つJavaRDDタプル持っている:私は降順でダブル値でタプルをソートしたい

JavaRDD<Tuple4 <Integer, Double, Long, Integer>> revenue = ...; 

を。

2つのDoublesの値が同じ場合は、Long値で昇順にソートします。例えばので

は:

class TupleComparator implements Comparator<Tuple4<Integer, Double, Long, Integer>>, Serializable { 

     private static final long serialVersionUID = 1L; 

     @Override 
     public int compare(Tuple4<Integer, Double, Long, Integer> v1, 
     Tuple4<Integer, Double, Long, Integer> v2) { 

     if(v1._2().compareTo(v2._2()) == 0){ 
      return v1._3().compareTo(v2._3()); 
     } 
      return - v1._2().compareTo(v2._2()); 
     } 
    } 

しかし、順番に:これまでのところ私はこのようなカスタムComparatorを使用しようとした

(1, 5.1, 7, 10) 
    (1, 4.3, 4, 2) 
    (7, 4.3, 5, 9) 
    (3, 4.3, 8, 5) 
    (8, 1.2, 4, 7) 

(7, 4.3, 5, 9) 
    (1, 5.1, 7, 10) 
    (8, 1.2, 4, 7) 
    (1, 4.3, 4, 2) 
    (3, 4.3, 8, 5) 

にソートされていますカスタムコンパレータを使用するには、sortByKey関数を使用する必要があります。

私はキーを作る必要があります。 (通常のsortBy関数はコンパレータをとらないので)。

私は、このように私のコンパレータを適用しようとした場合:

revenue.keyBy(x -> x._2()).groupByKey().sortByKey(new TupleComparator(), false, 1); 

私が手: "メソッドsortByKeyを...(int型、TupleComparator、ブール値)の引数には適用されません"

これは私が立ち往生している場所です。私がやっていることが正しかったのか、それともコンパレータの働きをするのかは分かりません。 (私はカスタムコンパレータに慣れていません)。

これを実現するにはもっと良い方法がありますか?私はそれがScalaでもっと簡単であることを知っています。

しかし、私はJavaで、そしてRDD APIを使用するだけでそれを行う必要があります。

+0

のためであるあなたは、 'JavaRdd.sortBy'を使用し、最初の値' '-1 Tuple2'を返す関数を渡すことはできません*値2番目の値は 'value._3()'です。 –

+0

それは実際には素晴らしいと簡単なアイデアです。ありがとうございました! – Rhyzx

答えて

0

2段階で行う必要があります。まず、Double値の降順でRDDをソートする必要があります。

JavaRDD<Tuple4<Integer, Double, Long, Integer>> firstSortRDD = revenue.sortBy(new Function<Tuple4<Integer, Double, Long, Integer>, Integer>() { 
     @Override 
     public Integer call(Tuple4<Integer, Double, Long, Integer> value) throws Exception { 
      return value._2().intValue(); 
     } 
    }, false, 1); 

次の並べ替えは、Double値の順序に依存するLong値によって昇順に行われます。したがって、鍵Tuple2<Double,Long>を作成する必要があります。今すぐsortByKeyメソッドを使用し、比較のためにカスタムロジックを持つComparatorを渡します。

JavaRDD<Tuple4<Integer,Double,Long,Integer>> secondSortRDD = firstSortRDD.keyBy(new Function<Tuple4<Integer, Double, Long, Integer>, Tuple2<Double, Long>>(){ 
     @Override 
     public Tuple2<Double, Long> call(Tuple4<Integer, Double, Long, Integer> value) throws Exception { 
      return new Tuple2(value._2(),value._3()); 
     }}).sortByKey(new TupleComparator()).values(); 

そして、ここであなたのコンパレータのクラスはTuple2<Double,Long>

class TupleComparator implements Comparator<Tuple2<Double,Long>>, Serializable { 
private static final long serialVersionUID = 1L; 
@Override 
public int compare(Tuple2<Double, Long> v1, Tuple2<Double, Long> v2) { 
    if (v1._1().compareTo(v2._1()) == 0) { 
     return v1._2().compareTo(v2._2()); 
    } 
     return v2._2().compareTo(v1._2()); 
    } 
} 
+0

ありがとうございました!コンパレータを少し変更するだけでした。私は変更しました: 'return v2._2()。compareTo(v1._2());' は 'v2._1()を返します。_1()); ' それ以外の場合は常に、降順のLong値に従ってソートされます。 これで完全に動作します。ありがとうございました。 – Rhyzx

関連する問題