2016-12-12 12 views
0

私はSPARKには新しく、Spark Java APIを検討しています。私はファイルを持っていますSpark Java APIの総給与計算方法

1201, John, 2500 
1202, Alex, 2800 
1203, amith, 3900 
1204, javed, 2300 
1205, Saminga, 23000 

今私は合計給与を計算してファイルに保存する必要があります。私はMR/spark Java APIのために非常に新しいので、私はそれを理解することができませんでした。誰にでもこのことから私を助けることができますか?

サンプルコード:

import java.util.Arrays; 
import java.util.Comparator; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.DoubleFunction; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 

import scala.Tuple2; 
public class SalarySum { 

    public static void main(String[] args) 
    { 
     final int k=0; 

     if(args.length<1) 
     { 
      System.out.println("Please provide input files for processing"); 
      System.exit(0); 
     } 
     else 
     { 
      String inputFile=args[0]; 
      String outputFile=args[1]; 
      SparkConf config=new SparkConf().setAppName("Total Salary Example"); 
      JavaSparkContext spartContext=new JavaSparkContext(config); 

      JavaRDD<String> inputReader=spartContext.textFile(inputFile); 

      JavaRDD<String> map=inputReader.flatMap(new FlatMapFunction<String, String>() { 
       @Override 
       public Iterable<String> call(String t) throws Exception 
       { 
        System.out.println("Flat Map Data: "+t); 
        return Arrays.asList(t); 
       } 
      }); 

      JavaPairRDD<Integer, Iterable<String>> group=map.groupBy(new Function<String, Integer>() { 

       @Override 
       public Integer call(String s2) throws Exception 
       { 
        String data=s2.split(",")[2].trim(); 
        int value=Integer.parseInt(data); 
        System.out.println("Tuple: "+s2 +" : "+data); 
        return value; 
       } 
      }); 


      JavaPairRDD<Integer, Integer> totalSaleData = group.flatMapValues(new Function<Iterable<String>, Iterable<Integer>>() { 

       @Override 
       public Iterable<Integer> call(Iterable<String> v1) 
         throws Exception 
       { 
        int count=0; 
        for(String str:v1) 
        { 
         String data=str.split(",")[2].trim(); 
         int value=Integer.parseInt(data); 
         System.out.println("Iterating Values : "+str); 
         System.out.println("Count: "+count); 
         count =count+value; 
        } 
        return Arrays.asList(count); 
       } 
      }); 

      totalSaleData.saveAsTextFile(outputFile); 

     } 
    } 

} 
+0

入力ファイルはtxtまたはcsvですか?データフレームではなくRDDを使用しますか?ファイルに書き込まれる予想出力は何ですか? – abaghel

+0

私の入力ファイルはテキストファイルです。私はRDDを使用する必要があり、outは完全な給与の合計でなければなりません。 – Navyah

答えて

1

あなたはスパーク1.6を使用して、以下のようにそれを行うことができます。

public class SparkSalarySum { 
public static void main(String[] args) { 
    SparkConf conf = new SparkConf().setAppName("SparkSalarySum").setMaster("local[2]"); 
    JavaSparkContext jsc = new JavaSparkContext(conf); 
    JavaRDD<String> lines = jsc.textFile("c:\\temp\\test.txt"); 
    JavaPairRDD<String, Integer> total = lines.flatMap(line -> Arrays.asList(Integer.parseInt(line.split(",")[2].trim()))) 
      .mapToPair(sal -> new Tuple2<String, Integer>("Total", sal)) 
      .reduceByKey((x, y) -> x + y); 
    total.foreach(data -> { 
     System.out.println(data._1()+"-"+data._2()); 
    }); 
    total.coalesce(1).saveAsTextFile("c:\\temp\\testOut"); 
    jsc.stop(); 
    } 
} 
+0

こんにちは、JavaSparkContext APIで同じコードを書いています。私はスパーク1.6のバージョンを使用しています – Navyah

+0

スパーク1.6の私の答えを更新しました。 – abaghel

+0

しかし、このコードを使ってテキストファイルに結果を格納すると、ファイルに結果を格納するメソッドを見ることができませんでした。 – Navyah