2017-06-10 7 views
2

私はJavaDStreamsで動作するアプリケーションを持っています。 これはコードの一部です。ここでは、単語が表示される頻度を計算します。JavaDStreamをソートする - Spark Streaming

JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
     new PairFunction<String, String, Integer>() { 
     @Override 
     public Tuple2<String, Integer> call(String s) { 
      return new Tuple2<>(s, 1); 
     } 
     }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
     @Override 
     public Integer call(Integer i1, Integer i2) { 
      return i1 + i2; 
     } 
     }); 

、私はトップN整数値によってソート頻繁要素を、印刷することを望んだ場合、(JavaPairRDD用)sortByKeyのようなメソッドが存在しない場合、私はこれをどのように行うことができますか?

+0

自分でメソッドを実装することができます。 – Wang

+0

はい、私は回避策を考えましたが、JavaDStreamを使用することはできませんが、RDDだけになります。 – sirdan

+1

私はストリーミングのためにデータが絶えず来ていると思います。それを並べ替えるのは難しいです。 – Wang

答えて

3

JavaPairDStream<String, Integer>を持っていて、整数値でソートしたいので、最初にペアをスワップする必要があります。

JavaPairDStream<Integer,String> swappedPair = wordCounts.mapToPair(x -> x.swap()); 

今、あなたはtransformToPairを使用してソートしsortByKey機能を使用することができます。

JavaPairDStream<Integer,String> sortedStream = swappedPair.transformToPair(
    new Function<JavaPairRDD<Integer,String>, JavaPairRDD<Integer,String>>() { 
     @Override 
     public JavaPairRDD<Integer,String> call(JavaPairRDD<Integer,String> jPairRDD) throws Exception { 
        return jPairRDD.sortByKey(false); 
        } 
       }); 

sortedStream.print(); 
+0

ありがとう、これは私の問題を解決しました。 – sirdan

0

簡素化:

JavaPairDStream<String, Long> counts = lines.countByValue(); 
    JavaPairDStream<Long,String> swappedPair = counts.mapToPair(Tuple2::swap); 
    JavaPairDStream<Long,String> sortedStream = swappedPair.transformToPair(s -> s.sortByKey(false)); 
関連する問題