2016-04-29 12 views
0

私のプロジェクトは、最大、最小、平均温度を表示することです。私はすでにそれをしていますが、私はこの機能をキーでグループ化して表示する必要があります。私のアプリケーションには、年、月、日、都市の4つのラジオボタンがあります。 1つを選択すると、集計関数の入力を求められます(max、min、avg)。これらについては私のCompositeGroupKeyクラスを変更する必要がありますが、私はそれについては何も考えていません。だから私を助けてください、そして、コードで行われる必要がある変更についてのインプットを提供してください。hadoopを使用して最大、最小、平均温度を表示したい

ドライバ:

import org.apache.hadoop.io.*; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class MaxTemperature 
{    
      public static void Main (String[] args) throws Exception 
      { 
         if (args.length != 2) 
         { 
           System.err.println("Please Enter the input and output parameters"); 
           System.exit(-1); 
         } 

         Job job = new Job(); 
         job.setJarByClass(MaxTemperature.class); 
         job.setJobName("Max temperature"); 

         FileInputFormat.addInputPath(job,new Path(args[0])); 
         FileOutputFormat.setOutputPath(job,new Path (args[1])); 

         job.setMapperClass(MaxTemperatureMapper.class); 
         job.setReducerClass(MaxTemperatureReducer.class); 

         job.setMapOutputKeyClass(CompositeGroupKey.class); 
         job.setMapOutputValueClass(IntWritable.class); 

         job.setOutputKeyClass(CompositeGroupKey.class); 
         job.setOutputValueClass(DoubleWritable.class); 


         System.exit(job.waitForCompletion(true)?0:1); 
      } 
} 

マッパー:

import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.*; 
import java.io.IOException; 

public class MaxTemperatureMapper extends Mapper <LongWritable, Text, CompositeGroupKey, IntWritable> 
{ 
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    { 
    String line = value.toString(); 
    int year = Integer.parseInt(line.substring(0,4)); 
    String mnth = line.substring(7,10); 
    int date = Integer.parseInt(line.substring(10,12)); 
    int temp= Integer.parseInt(line.substring(12,14)); 

    CompositeGroupKey cntry = new CompositeGroupKey(year,mnth, date); 


    context.write(cntry, new IntWritable(temp)); 
      } 
} 

減速:

import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.*; 
import java.io.IOException; 


public class MaxTemperatureReducer extends Reducer <CompositeGroupKey, IntWritable, CompositeGroupKey, CompositeGroupkeyall >{ 


    public void reduce(CompositeGroupKey key, Iterable<IntWritable> values , Context context) throws IOException,InterruptedException 
      { 


      Double max = Double.MIN_VALUE; 
      Double min =Double.MAX_VALUE; 

      for (IntWritable value : values ) 
      {    
       min = Math.min(min, value.get()); 
       max = Math.max(max, value.get()); 

      } 

      CompositeGroupkeyall val =new CompositeGroupkeyall(max,min); 
      context.write(key, val); 
      } 
} 

そして、複合キー:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableUtils; 

class CompositeGroupKey implements WritableComparable<CompositeGroupKey> { 
    int year; 
    String mnth; 
    int date; 
    CompositeGroupKey(int y, String c, int d){ 
     year = y; 
     mnth = c; 
     date = d; 
    } 
    CompositeGroupKey(){} 

    public void write(DataOutput out) throws IOException { 
     out.writeInt(year); 
     WritableUtils.writeString(out, mnth); 
     out.writeInt(date); 
     } 
    public void readFields(DataInput in) throws IOException { 
     this.year = in.readInt(); 
     this.mnth = WritableUtils.readString(in); 
     this.date = in.readInt(); 
     } 
    public int compareTo(CompositeGroupKey pop) { 
     if (pop == null) 
      return 0; 
     int intcnt; 
     intcnt = Integer.valueOf(year).toString().compareTo(Integer.valueOf(pop.year).toString()); 
     if(intcnt != 0){ 
      return intcnt; 
     }else if(mnth.compareTo(pop.mnth) != 0){ 
      return mnth.compareTo(pop.mnth); 
     }else{ 
      return Integer.valueOf(date).toString().compareTo(Integer.valueOf(pop.date).toString()); 
     } 
    } 
    public String toString() { 
     return year + " :" + mnth.toString() + " :" + date; 
     } 
} 









import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.io.WritableComparable; 
class CompositeGroupkeyall implements WritableComparable<CompositeGroupkeyall> { 
    Double max; 
    Double min; 

    CompositeGroupkeyall(double x, double y){ 
     max = x ; 
     min = y ; 
    } 
    CompositeGroupkeyall(){} 

    public void readFields(DataInput in) throws IOException { 
     this.max = in.readDouble(); 
     this.min = in.readDouble(); 
    } 

    public void write(DataOutput out) throws IOException { 
     out.writeDouble(max); 

     out.writeDouble(min); 


     } 

    public int compareTo(CompositeGroupkeyall arg0) { 
     return -1; 
    } 

    public String toString() { 
     return max + " " + min +" " ; 
     } 
} 

答えて

0

以下のようにさらに多くのキー値のペアを作成し、同じレデューサーにデータを処理させることができます。同じレデューサーによってすべての日付/月/年が処理されます。

CompositeGroupKey cntry = new CompositeGroupKey(year, mnth, date); 
CompositeGroupKey cntry_date = new CompositeGroupKey((int)0, "ALL", date); 
CompositeGroupKey cntry_mnth = new CompositeGroupKey((int)0, mnth, (int) 1); 
CompositeGroupKey cntry_year = new CompositeGroupKey(year, "ALL", (int) 1); 

context.write(cntry, new IntWritable(temp)); 
context.write(cntry_date, new IntWritable(temp)); 
context.write(cntry_mnth, new IntWritable(temp)); 
context.write(cntry_year, new IntWritable(temp)); 
関連する問題