2017-02-09 17 views
1

私はhadoopを初めて使用し、いくつかの基本的なmap reduceプログラムを試しています。私はいくつかの特有の問題にぶつかっています。私は、私のマッパー出力が減速機をバイパスして出力ファイルに直接印刷されていることを観察しました。Mapperの出力が出力ファイルに直接出力されます

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.HashMap; 
import java.util.Iterator; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 



class WeatherRecord implements Writable { 
    public DoubleWritable maxSum; // running sum of TMAX records 
    public IntWritable maxCount; // running count of TMAX records 
    public DoubleWritable minSum; // running sum of TMIN records 
    public IntWritable minCount; // running count of TMIN records 

    // default constructor 
    public WeatherRecord(){ 

     maxSum = new DoubleWritable(); 
     maxCount= new IntWritable(); 
     minSum = new DoubleWritable(); 
     minCount = new IntWritable(); 
    } 

    // custom constructor 
    public WeatherRecord(DoubleWritable ms, IntWritable mc, DoubleWritable ms1, IntWritable mc1){ 
     maxSum = ms; 
     maxCount= mc; 
     minSum = ms1; 
     minCount = mc1; 
    } 

    /* Getter and setter Methods*/ 



    //method to get running total of temperature 
    public double getMaxSum(){ 
     return Double.parseDouble(maxSum.toString()); 
    } 

    //method to get running total of temperature 
    public double getMinSum(){ 
     return Double.parseDouble(minSum.toString()); 
    } 

    //method to get Count 
    public int getMaxCount(){ return Integer.parseInt(maxCount.toString());} 

    //method to get Count 
    public int getMinCount(){ return Integer.parseInt(minCount.toString());} 

    // method to set count 
    public void setMaxCount(int c){ 
     maxCount = new IntWritable(c); 
    } 

    // method to set count 
    public void setMinCount(int c){ 
     minCount = new IntWritable(c); 
    } 

    //method to set reading sum 
    public void setMaxSum(double r){ 
     maxSum = new DoubleWritable(r); 
    } 

    //method to set reading sum 
    public void setMinSum(double r){ 
     minSum = new DoubleWritable(r); 
    } 

    // method to serialize object 
    public void write(DataOutput dataOutput) throws IOException { 
     maxSum.write(dataOutput); 
     maxCount.write(dataOutput); 
     minSum.write(dataOutput); 
     minCount.write(dataOutput); 
    } 

    //method to deserialize object 
    public void readFields(DataInput dataInput) throws IOException { 
     maxSum.readFields(dataInput); 
     maxCount.readFields(dataInput); 
     minSum.readFields(dataInput); 
     minCount.readFields(dataInput); 
    } 
} 


public class WeatherDriver extends Configured implements Tool{ 

    public static class WeatherMap extends Mapper<LongWritable, Text, Text,WeatherRecord > { 

     HashMap<String,WeatherRecord> recordMap= new HashMap<String,WeatherRecord>(); 

     protected void map(LongWritable key, Text value, Mapper.Context context) { 
      //the individual records from csv file is split based on ',' 
      String[] record = value.toString().split(","); 

      //station-id is the first field in the file 
      String stationId = record[0]; 

      //record-type(TMAX,TMIN,..) is the third field in the csv file 
      String type = record[2]; 

      //temperature readings are fourth column in the csv file 
      double temperature = Double.parseDouble(record[3]); 


      if(type.equalsIgnoreCase("TMAX") || type.equalsIgnoreCase("TMIN")){ 

       if(recordMap.containsKey(stationId)){ 
        WeatherRecord w = recordMap.get(stationId); 
        if(type.equalsIgnoreCase("TMAX")){ 
         w.setMaxCount(1 + w.getMaxCount()); 
         w.setMaxSum(w.getMaxSum() + temperature); 
        } 
        else if(type.equalsIgnoreCase("TMIN")){ 
         w.setMinCount(1+w.getMinCount()); 
         w.setMinSum(w.getMinSum() + temperature); 
        } 
        recordMap.put(stationId,w); 
       } 
       else{ 
        if(type.equalsIgnoreCase("TMAX")){ 
         recordMap.put(stationId, new WeatherRecord(new DoubleWritable(temperature), new IntWritable(1), 
           new DoubleWritable(0), new IntWritable(0))); 
        } 
        else if(type.equalsIgnoreCase("TMIN")){ 
         recordMap.put(stationId, new WeatherRecord(new DoubleWritable(0), new IntWritable(0), 
           new DoubleWritable(temperature), new IntWritable(1))); 
        } 

       } 
      } 

     } // end of map method 

     protected void cleanup(Context context) throws IOException, InterruptedException { 
      Iterator i = recordMap.keySet().iterator(); 
      String stationId=""; 
      while(i.hasNext()){ 
       stationId = i.next().toString(); 

       context.write(new Text(stationId),recordMap.get(stationId)); 
      } 
     } // end of cleanup 
    } // end of mapper class 


    public static class WeatherReduce extends Reducer<Text, WeatherRecord, Text, Text> { 

     protected void reduce(Text key, Iterator<WeatherRecord> values, Reducer<Text, WeatherRecord, Text, Text>.Context context) throws IOException, InterruptedException { 
      // initializing local variables to compute average 
      int maxCount =0; 
      int minCount=0; 
      double maxSum=0; 
      double minSum=0; 



      //iterating over list of values to compute average 
      while(values.hasNext()){ 
       WeatherRecord record = values.next(); 

       maxSum += Double.parseDouble(record.maxSum.toString()); 
       maxCount += Integer.parseInt(record.maxCount.toString()); 
       minSum += Double.parseDouble(record.minSum.toString()); 
       minCount+=Integer.parseInt(record.minCount.toString()); 

      } 

      // logic to handle divide by zero case 

      if(minCount==0){ 
       minCount=1; 
      } 
      if(maxCount==0){ 
       maxCount=1; 
      } 

      System.out.println("Min Sum is" + minSum); 

      context.write(new Text(key), new Text(","+(minSum/minCount)+","+(maxSum/maxCount))); 


     } 
    } 

    @Override 
    public int run(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     args = new GenericOptionsParser(conf, args).getRemainingArgs(); 
     String input = args[0]; 
     String output = args[1]; 

     Job job = new Job(conf, "weather average"); 
     job.setJarByClass(WeatherMap.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setMapperClass(WeatherMap.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(WeatherRecord.class); 

     job.setReducerClass(WeatherReduce.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     FileInputFormat.setInputPaths(job, new Path(input)); 
     Path outPath = new Path(output); 
     FileOutputFormat.setOutputPath(job, outPath); 
     outPath.getFileSystem(conf).delete(outPath, true); 

     job.waitForCompletion(true); 
     return (job.waitForCompletion(true) ? 0 : 1); 
    } 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new WeatherDriver(), args); 
     System.exit(exitCode); 
    } 
} 

予想される出力は station_id、average_min_temp、average_max_temp

AGE00135039,123.12,11 

だろうしかし、その代わりに、私はこのoutput.By分析コードを取得しています、私が見つけたが、次のように

コードがありますcontext.write from mapperが直接出力ファイルに書き込むこと。

AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 

答えて

1

reduce()メソッドの署名が正しくないため、あなたの仕事はおそらくReducerクラスのreduce()メソッドを呼び出しています。あなたはしている:それはのようになります

protected void reduce(Text key, Iterator<WeatherRecord> values, Context context)

protected void reduce(Text key, Iterable<WeatherRecord> values, Context context)

IteratorからIterableへの変化に注意してください。

これを回避する方法の1つは、基本実装をオーバーライドする必要があると思われるメソッドに@Override注釈を追加することです。そうでない場合は、コンパイル時エラーがスローされます。

+0

これはうまくいきました。あなたの助けを借りてありがとう:-) –

関連する問題