私はSPARKには新しく、Spark Java APIを検討しています。私はファイルを持っていますSpark Java APIの総給与計算方法
1201, John, 2500
1202, Alex, 2800
1203, amith, 3900
1204, javed, 2300
1205, Saminga, 23000
今私は合計給与を計算してファイルに保存する必要があります。私はMR/spark Java APIのために非常に新しいので、私はそれを理解することができませんでした。誰にでもこのことから私を助けることができますか?
サンプルコード:
import java.util.Arrays;
import java.util.Comparator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class SalarySum {
public static void main(String[] args)
{
final int k=0;
if(args.length<1)
{
System.out.println("Please provide input files for processing");
System.exit(0);
}
else
{
String inputFile=args[0];
String outputFile=args[1];
SparkConf config=new SparkConf().setAppName("Total Salary Example");
JavaSparkContext spartContext=new JavaSparkContext(config);
JavaRDD<String> inputReader=spartContext.textFile(inputFile);
JavaRDD<String> map=inputReader.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String t) throws Exception
{
System.out.println("Flat Map Data: "+t);
return Arrays.asList(t);
}
});
JavaPairRDD<Integer, Iterable<String>> group=map.groupBy(new Function<String, Integer>() {
@Override
public Integer call(String s2) throws Exception
{
String data=s2.split(",")[2].trim();
int value=Integer.parseInt(data);
System.out.println("Tuple: "+s2 +" : "+data);
return value;
}
});
JavaPairRDD<Integer, Integer> totalSaleData = group.flatMapValues(new Function<Iterable<String>, Iterable<Integer>>() {
@Override
public Iterable<Integer> call(Iterable<String> v1)
throws Exception
{
int count=0;
for(String str:v1)
{
String data=str.split(",")[2].trim();
int value=Integer.parseInt(data);
System.out.println("Iterating Values : "+str);
System.out.println("Count: "+count);
count =count+value;
}
return Arrays.asList(count);
}
});
totalSaleData.saveAsTextFile(outputFile);
}
}
}
入力ファイルはtxtまたはcsvですか?データフレームではなくRDDを使用しますか?ファイルに書き込まれる予想出力は何ですか? – abaghel
私の入力ファイルはテキストファイルです。私はRDDを使用する必要があり、outは完全な給与の合計でなければなりません。 – Navyah