2016-05-29 11 views
2

私は私の結果は になりたい、このようになりますJavaRDDを持ってJava Spark RDDで標準偏差と平均操作を実行するにはどうすればよいですか?

[ 
[A,8] 
[B,3] 
[C,5] 
[A,2] 
[B,8] 
... 
... 
] 

は、私が唯一のJava RDDSを使用して、これをどのように行うのです

[ 
[A,5] 
[B,5.5] 
[C,5] 
] 

を意味します。 P.S:私はgroupBy操作を避けたいので、私はDataFramesを使用していません。

+0

DataFrame'が見'上:http://stackoverflow.com/q/32902982/1560062。残りの部分:http://stackoverflow.com/q/35780331/1560062およびhttp://stackoverflow.com/a/35407148/1560062 – zero323

答えて

2

あなたが行く: `groupBy`について

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.util.StatCounter; 
import scala.Tuple2; 
import scala.Tuple3; 

import java.util.Arrays; 
import java.util.List; 

public class AggregateByKeyStatCounter { 

    public static void main(String[] args) { 

    SparkConf conf = new SparkConf().setAppName("AggregateByKeyStatCounter").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    List<Tuple2<String, Integer>> myList = Arrays.asList(new Tuple2<>("A", 8), new Tuple2<>("B", 3), new Tuple2<>("C", 5), 
      new Tuple2<>("A", 2), new Tuple2<>("B", 8)); 

    JavaRDD<Tuple2<String, Integer>> data = sc.parallelize(myList); 
    JavaPairRDD<String, Integer> pairs = JavaPairRDD.fromJavaRDD(data); 

    /* I'm actually using aggregateByKey to perform StatCounter 
     aggregation, so actually you can even have more statistics available */ 
    JavaRDD<Tuple3<String, Double, Double>> output = pairs 
         .aggregateByKey(
          new StatCounter(), 
          StatCounter::merge, 
          StatCounter::merge) 
         .map(x -> new Tuple3<String, Double, Double>(x._1(), x._2().stdev(), x._2().mean())); 

    output.collect().forEach(System.out::println); 
    } 

} 
0

reduceByKeyを使用して、キーごとに合計とカウントを計算し、次に各キーごとに次のように分割することができます。ここで

val means: RDD[(String, Double)] = rdd 
.map(x => (x._1, (x._2, 1))) // add 1 for each element for the count 
.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // create a tuple (count, sum) for each key 
.map{ case (k, v) => (k, v._1/v._2) } // calculate mean for each key 
関連する問題