2017-06-20 3 views
0

私は最近hadoopを学んでいます(私はhadoop2.7.3 & java 1.7.0.89を使用しています)。私は異なるテレビ放送の分析にいくつかのコードを書いて&異なるウェブサイトでのコメント。私はfilesinputformatクラスを書き直します。しかし、私はeclipse.there上で自分のコードを実行すると、多くの例外があります。私はEclipse上でデバッグしようとします。ただマッパーを見つける見つけるか、または減速は、いくつかの問題を抱えている..しかし、私はここに 書き換えられたfileInputFormatを使用したMapReduceは結果を出力できません

が日付例..です何が悪かったのかわからないんだけど、二次データは

truelove 3 3678 0 0 0 1 
truelove 2 39155 0 0 173 438 
truelove 1 142208 1 2 1 1 
truelove 1 142208 1 2 1 1 
truelove 1 142208 1 2 1 1 
frink 2 950 0 0 0 0 
frink 2 800 0 0 0 0 
daughter 4 4489850 0 0 0 0 
daughter 4 1161 0 0 0 0 
princess 2 33593 0 0 0 3 
princess 2 36118 0 0 0 2 
princess 2 38608 0 0 0 1 
princess 3 2542 0 0 0 0 
princess 1 35322 2 4 0 1 

、その後、ウェブサイトのニックネームであります私はのInputFormat

カスタムデータ形式

 package com.hadoop.mapreduce; 

    import java.io.DataInput; 
    import java.io.DataOutput; 
    import java.io.IOException; 

    import org.apache.hadoop.io.WritableComparable; 


    /*1-5means:1youku2souhu3tudou4aiqiyi5xunlei 
    princess 2 33593 0 0 0 3 
    princess 2 36118 0 0 0 2 
    princess 2 38608 0 0 0 1 
    princess 3 2542 0 0 0 0 
    princess 1 35322 2 4 0 1*/ 
    public class TVplaydata implements WritableComparable<Object>{ 
     //private String tvname; 

     private int tvplaynum; 
     private int tvfavorite; 
     private int tvcomment; 
     private int tvdown; 
     private int tvvote; 
    public TVplaydata(){} 
    public void set(int tvplaynum,int tvfavorite,int tvcomment,int tvdown,int tvvote){ 
     this.tvplaynum = tvplaynum; 
     this.tvfavorite = tvfavorite; 
     this.tvcomment = tvcomment; 
     this.tvdown = tvdown; 
     this.tvvote = tvvote; 
    } 
    //source get set 
    public void setTvpalynum(int tvplaynum) { 
     this.tvplaynum = tvplaynum; 
    } 
    public int getTvpalynum() { 
     return tvplaynum; 

    } 

    public int getTvfavorite() { 
     return tvfavorite; 
    } 
    public void setTvfavorite(int tvfavorite) { 
     this.tvfavorite = tvfavorite; 
    } 
    public int getTvcomment() { 
     return tvcomment; 
    } 
    public void setTvcomment(int tvcomment) { 
     this.tvcomment = tvcomment; 
    } 
    public int getTvdown() { 
     return tvdown; 
    } 
    public void setTvdown(int tvdown) { 
     this.tvdown = tvdown; 
    } 
    public int getTvvote() { 
     return tvvote; 
    } 
    public void setTvvote(int tvvote) { 
     this.tvvote = tvvote; 
    } 
     @Override 

     public void readFields(DataInput in) throws IOException { 
      // TODO Auto-generated method stub 
      tvplaynum = in.readInt(); 
      tvfavorite = in.readInt(); 
      tvcomment = in.readInt(); 
      tvdown = in.readInt(); 
      tvvote = in.readInt(); 
     } 

     @Override 

     public void write(DataOutput out) throws IOException { 
      // TODO Auto-generated method stub 
      out.writeInt(tvplaynum); 
      out.writeInt(tvfavorite); 
      out.writeInt(tvcomment); 
      out.writeInt(tvdown); 
      out.writeInt(tvvote); 
     } 

     @Override 
     public int compareTo(Object o) { 
      // TODO Auto-generated method stub 
      return 0; 
     } 

    } 

が、その後のInputFormatを書き換える書き換えます。最後

package com.hadoop.mapreduce; 
import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.util.LineReader; 



class PlayinputFormat extends FileInputFormat<Text, TVplaydata>{ 

    @Override 
    public RecordReader<Text, TVplaydata> createRecordReader(InputSplit input, TaskAttemptContext context) 
      throws IOException, InterruptedException { 
     // TODO Auto-generated method stub 

     return new TvplayRecordReader(); 
    } 
    class TvplayRecordReader extends RecordReader<Text, TVplaydata>{ 

     public LineReader in; 
     public Text lineKey; 
     public TVplaydata lineValue; 
     public Text line; 

     @Override 
     public void close() throws IOException { 
      // TODO Auto-generated method stub 
      if(in !=null){ 
       in.close(); 
      } 
     } 

     @Override 
     public Text getCurrentKey() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return lineKey; 
     } 

     @Override 
     public TVplaydata getCurrentValue() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return lineValue; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return 0; 
     } 

     @Override 
     public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      FileSplit split=(FileSplit)input; 
      Configuration job=context.getConfiguration(); 
      Path file=split.getPath(); 
      FileSystem fs=file.getFileSystem(job); 

      FSDataInputStream filein=fs.open(file); //open 
      in=new LineReader(filein,job); 
      line=new Text(); 
      lineKey=new Text(); 
      lineValue = new TVplaydata(); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      int linesize=in.readLine(line); 
      if(linesize==0) return false; 

      String[] pieces = line.toString().split("\t"); 
      if(pieces.length != 7){ 
       throw new IOException("Invalid record received"); 
      } 


      lineKey.set(pieces[0]+"\t"+pieces[1]); 
      lineValue.set(Integer.parseInt(pieces[2]),Integer.parseInt(pieces[3]),Integer.parseInt(pieces[4]) 
        ,Integer.parseInt(pieces[5]),Integer.parseInt(pieces[6])); 
      return true; 
     } 
    } 
} 

ここ

package com.hadoop.mapreduce; 

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 



public class TVPlay extends Configured implements Tool{ 
public static void main(String[] args) throws Exception { 
    String[] paths = {"hdfs://wang:9000/mapreduce/tvplay.txt","hdfs://wang:9000/mapreduce/tvout"}; 
    int ec = ToolRunner.run(new Configuration(), new TVPlay(), paths); 
    System.exit(ec); 
} 


     //mapper 
public class TVmapper extends Mapper<Text, TVplaydata, Text, TVplaydata> { 
    public void map(Text key,TVplaydata value, Context context) throws IOException, InterruptedException { 
       context.write(key, value); 
      } 
     } 
public class TVreducer extends Reducer<Text, TVplaydata, Text, Text>{ 
      private Text m_key = new Text(); 
      private Text m_value = new Text(); 
      private MultipleOutputs<Text, Text> mos; 
      protected void setup(Context context) throws IOException, 
        InterruptedException { 
       mos = new MultipleOutputs<Text, Text>(context); 
      } 
     public void reduce(Text key,Iterable<TVplaydata> values, Context context) throws IOException, InterruptedException{ 

       int tvplaynum = 0; 
       int tvfavorite = 0; 
       int tvcomment = 0; 
       int tvvote = 0; 
       int tvdown = 0; 
      for (TVplaydata tv:values) { 
       tvplaynum += tv.getTvpalynum(); 
       tvfavorite += tv.getTvfavorite(); 
       tvcomment += tv.getTvcomment(); 
       tvvote += tv.getTvvote(); 
       tvdown += tv.getTvdown(); 
       } 

      String[] records = key.toString().split("\t"); 

      String source = records[1]; 
      m_key.set(records[0]); 
      m_value.set(tvplaynum+"\t"+tvfavorite+"\t"+tvcomment+"\t"+tvdown+"\t"+tvvote); 

      if(source.equals("1")){ 
       mos.write("youku", m_key, m_value); 
      }else if (source.equals("2")) { 
       mos.write("souhu", m_key, m_value); 
      }else if (source.equals("3")) { 
       mos.write("tudou",m_key, m_value); 
      }else if (source.equals("4")) { 
       mos.write("aiqiyi", m_key, m_value); 
      }else if (source.equals("5")) { 
       mos.write("xunlei", m_key, m_value); 
      }else{ 
       mos.write("other", m_key, m_value); 
      } 
     } 
     protected void cleanup(Context context) throws IOException ,InterruptedException{ 
       mos.close(); 
     } 
     } 

     @Override 
     public int run(String[] arg0) throws Exception { 
      // TODO Auto-generated method stub 
      Configuration conf = new Configuration(); 

      Path path = new Path(arg0[1]); 
      FileSystem hdfs = path.getFileSystem(conf); 
      if(hdfs.isDirectory(path)){ 
       hdfs.delete(path, true); 
      } 
      Job job = new Job(conf,"tvplay"); 
      job.setJarByClass(TVPlay.class); 
      // set InputFormatClass 
      job.setInputFormatClass(PlayinputFormat.class); 
      // set mapper 
      job.setMapperClass(TVmapper.class); 
      job.setMapOutputKeyClass(Text.class); 
      job.setMapOutputValueClass(TVplaydata.class); 
      // set reduce 
      job.setReducerClass(TVreducer.class); 
      job.setOutputKeyClass(Text.class); 
      job.setOutputValueClass(Text.class); 

      FileInputFormat.addInputPath(job, new Path(arg0[0])); 
      FileOutputFormat.setOutputPath(job,new Path(arg0[1])); 
      MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "other", TextOutputFormat.class, 
        Text.class, Text.class); 

      return job.waitForCompletion(true)?0:1; 
     } 
} 

マッパー&リデューサーを実行するために、mainメソッドを書くが、私はこれが何を意味するのか見当がつかない例外

2017-06-20 23:03:26,848 INFO [org.apache.hadoop.conf.Configuration.deprecation] - session.id is deprecated. Instead, use dfs.metrics.session-id 
2017-06-20 23:03:26,854 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId= 
2017-06-20 23:03:27,874 WARN [org.apache.hadoop.mapreduce.JobResourceUploader] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
2017-06-20 23:03:28,186 WARN [org.apache.hadoop.mapreduce.JobResourceUploader] - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 
2017-06-20 23:03:28,236 INFO [org.apache.hadoop.mapreduce.lib.input.FileInputFormat] - Total input paths to process : 1 
2017-06-20 23:03:28,639 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:1 
2017-06-20 23:03:29,389 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - Submitting tokens for job: job_local622257889_0001 
2017-06-20 23:03:30,552 INFO [org.apache.hadoop.mapreduce.Job] - The url to track the job: http://localhost:8080/ 
2017-06-20 23:03:30,556 INFO [org.apache.hadoop.mapreduce.Job] - Running job: job_local622257889_0001 
2017-06-20 23:03:30,607 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter set in config null 
2017-06-20 23:03:30,630 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - File Output Committer Algorithm version is 1 
2017-06-20 23:03:30,670 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
2017-06-20 23:03:31,562 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local622257889_0001 running in uber mode : false 
2017-06-20 23:03:31,567 INFO [org.apache.hadoop.mapreduce.Job] - map 0% reduce 0% 
2017-06-20 23:03:31,569 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Waiting for map tasks 
2017-06-20 23:03:31,571 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Starting task: attempt_local622257889_0001_m_000000_0 
2017-06-20 23:03:31,667 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - File Output Committer Algorithm version is 1 
2017-06-20 23:03:31,691 INFO [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] - ProcfsBasedProcessTree currently is supported only on Linux. 
2017-06-20 23:03:34,256 INFO [org.apache.hadoop.mapred.Task] - Using ResourceCalculatorProcessTree : [email protected] 
2017-06-20 23:03:34,259 INFO [org.apache.hadoop.mapred.LocalJobRunner] - map task executor complete. 
2017-06-20 23:03:34,485 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local622257889_0001 
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:134) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:745) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
    at java.util.concurrent.FutureTask.run(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at java.lang.Class.getConstructor0(Unknown Source) 
    at java.lang.Class.getDeclaredConstructor(Unknown Source) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:128) 
    ... 8 more 
2017-06-20 23:03:34,574 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local622257889_0001 failed with state FAILED due to: NA 
2017-06-20 23:03:34,598 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 0 

です。あなたの助けを考えます!

+0

MRジョブの実行に使用したコマンドも表示できますか –

答えて

0

最後に、私はマッパを見つける&還元剤ではない静的メソッド。私はそれらを静的メソッドに変更してもOKです。

Why declaring Mapper and Reducer classes as static?

Hadoopの各マップのクラスのインスタンスを作成するか、実行するタスクを低減するためにリフレクションを使用します。作成された新しいインスタンスでは、引数のないコンストラクタが必要です(それ以外の場合は、どのように渡すべきかが分かります)。

静的キーワードなしで内部マッパーまたはクラスを宣言することで、Javaコンパイルは実際には構築時に親クラスのインスタンスが渡されることを期待するコンストラクタを作成します。あなたはまた、生成されたクラスファイル

に対しててjavapコマンドを実行して、これを見ることができるはず

あなたがトップでそれを見たことがない理由は、親クラスの宣言(で使用する場合、静的なキーワードが有効ではありませんレベル、ただし子クラスのみ)

0

還元機のrun()メソッドにジョブ設定を入れましたが間違っています。最も簡単な解決策は、このコードをすべてmain()メソッドTVplayに移動することです。

あなたが何をしているのかわからない限り、一般的にrun()メソッドをオーバーライドしてはいけません。これをreduceクラスから完全に削除します。

関連する問題