2017-07-20 22 views
1

私はすべての列のペアに対してpearsonの相関関係を取得しようとしています。私のmapreduceコードで次のエラーが発生しました

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.io.WritableComparable; 

public class IndexPair implements WritableComparable<IndexPair>{ 
     public static String[] labels 
          ={"Year","Month","MEI","CO2","CH4","N2O","CFC-11","CFC-12","TSI","Aerosols","Temp"}; 
     public long i,j; 
     public IndexPair() 
     { 

     } 

     public IndexPair(long i,long j) { 
      this.i=i; 
      this.j=j; 
     } 
     @Override 
     public void readFields(DataInput in) throws IOException { 
      i = in.readLong(); 
      j = in.readLong(); 
     } 

     @Override 
     public void write(DataOutput out) throws IOException { 
      out.writeLong(i); 
      out.writeLong(j); 
     } 
     @Override 
     public int compareTo(IndexPair o) { 
      Long i1 = i; 
      Long j1 = j; 
      Long i2 = o.i; 
      Long j2 = o.j; 

      int result = i1.compareTo(i2); 
      if (0 == result) { 
       return j1.compareTo(j2); 
      } 

      return result; 
     } 
     @Override 
     public String toString() 
     { 
      return "Corelation between column "+labels[(int) i]+"-->"+ labels[(int)j]; 
     } 

    } 

と値のペアのためのコードがthisoneです:IndexPairのコードはこの1つである

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class Pearson 
{ 
    public static class MyMapper extends Mapper<LongWritable,Text,IndexPair,ValuePair>{ 
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
      String line = value.toString(); 
      String[] tokens = line.split(","); 
      double[] arr = toDouble(tokens); 

      for(int i=0; i < arr.length; i++) { 
       for(int j=i+1; j < arr.length; j++) { 
       IndexPair k2 = new IndexPair(i, j); 
       ValuePair v2 = new ValuePair(arr[i], arr[j]); 
       context.write(k2, v2); 
       } 
       } 
      } 
      public double[] toDouble(String[] tokens) { 
      double[] arr = new double[tokens.length]; 
      for(int i=0; i < tokens.length; i++) { 
       arr[i] = Double.parseDouble(tokens[i]); 
      } 
      return arr; 
     } 
    } 
    public static class MyReduce extends Reducer<IndexPair,ValuePair,IndexPair,DoubleWritable> 
    { 
     public void reduce(IndexPair key, Iterable<ValuePair> values, Context context) throws IOException, InterruptedException { 
      double x = 0.0d; 
      double y = 0.0d; 
      double xx = 0.0d; 
      double yy = 0.0d; 
      double xy = 0.0d; 
      double n = 0.0d; 

      for(ValuePair pairs : values) { 
       x += pairs.v1; 
       y += pairs.v2; 
       xx += Math.pow(pairs.v1, 2.0d); 
       yy += Math.pow(pairs.v2, 2.0d); 
       xy += (pairs.v1 * pairs.v2); 
       n += 1.0d; 
      } 
      double numerator = xy - ((x * y)/n); 
      double denominator1 = xx - (Math.pow(x, 2.0d)/n); 
      double denominator2 = yy - (Math.pow(y, 2.0d)/n); 
      double denominator = Math.sqrt(denominator1 * denominator2); 
      double corr = numerator/denominator; 
      context.write(key, new DoubleWritable(corr)); 
      } 
    } 
    public static void main(String[] args) throws Exception 
    { 
      Configuration conf = new Configuration(); 
      Job job = Job.getInstance(conf, "Pearson's Correlation"); 
      job.setJarByClass(Pearson.class); 
      job.setMapperClass(MyMapper.class); 
      job.setCombinerClass(MyReduce.class); 
      job.setReducerClass(MyReduce.class); 

      job.setMapOutputKeyClass(IndexPair.class); 
      job.setMapOutputValueClass(ValuePair.class); 
      job.setOutputKeyClass(IndexPair.class); 
      job.setOutputValueClass(DoubleWritable.class); 

      FileInputFormat.addInputPath(job, new Path(args[0])); 
      FileOutputFormat.setOutputPath(job, new Path(args[1])); 
      System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

そして: は、これは私のMapReduceのコードである

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.io.WritableComparable; 

public class ValuePair implements WritableComparable<ValuePair>{ 

     public double v1,v2; 
     public ValuePair() 
     { 

     } 
     public ValuePair(double i,double j) 
     { 
      v1=i; 
      v2=j; 
     } 
     @Override 
     public void readFields(DataInput in) throws IOException { 
      v1=in.readDouble(); 
      v2=in.readDouble(); 
     } 

     @Override 
     public void write(DataOutput out) throws IOException { 
      out.writeDouble(v1); 
      out.writeDouble(v2); 
     } 

     @Override 
     public int compareTo(ValuePair o) { 
      // comparator for value pair is not required.... 
      return 0; 
     } 

} 

しかしWHNは私がしようとしていますこれを実行するには、次のエラーが発生します。

17/07/20 13:59:49 INFO mapreduce.Job: map 0% reduce 0% 
17/07/20 13:59:53 INFO mapreduce.Job: Task Id : attempt_1500536519279_0007_m_000000_0, Status : FAILED 
Error: java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class ValuePair 
    at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194) 
    at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1411) 
    at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1728) 
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105) 
    at Pearson$MyReduce.reduce(Pearson.java:66) 
    at Pearson$MyReduce.reduce(Pearson.java:1) 
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) 
    at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1749) 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1639) 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1491) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:175) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807) 
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:169) 
あなたは減速などを使用する場合、一方、コンバイナの

job.setCombinerClass(MyReduce.class); 

出力キーと値の型は、マッパーのものと同じでなければなりません:3210

答えて

0

問題は、コンバイナとして減速を使用することですコンバイナ、異なる種類のペアを放出しようとします。したがって、エラーです。

+0

ありがとうございました.... –

関連する問題