2016-05-12 15 views

答えて

2

Apacheのドキュメントpageのサンプルコードを引用して、さらに質問に答えています。

ワードカウントの主な方法を持つDriverクラスは、次のように定義されています。今Jobクラスのgrepcodeのウェブサイトから

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Job job = Job.getInstance(conf, "word count"); 
    job.setJarByClass(WordCount.class); 
    job.setMapperClass(TokenizerMapper.class); 
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 

は、バック何が起こっているかを追跡するときにJobクラスのwaitForCompletion方法。

/** 
    * Submit the job to the cluster and wait for it to finish. 
    * @param verbose print the progress to the user 
    * @return true if the job succeeded 
    * @throws IOException thrown if the communication with the 
    *   <code>JobTracker</code> is lost 
    */ 
    public boolean waitForCompletion(boolean verbose 
            ) throws IOException, InterruptedException, 
              ClassNotFoundException { 
    if (state == JobState.DEFINE) { 
     submit(); 
    } 
    if (verbose) { 
     jobClient.monitorAndPrintJob(conf, info); 
    } else { 
     info.waitForCompletion(); 
    } 
    return isSuccessful(); 
    } 

} 

は今Jobクラスでsubmit()メソッドのコードを確認してください。 JobClientクラスのgrepcodeサイトから

/** 
    * Submit the job to the cluster and return immediately. 
    * @throws IOException 
    */ 
    public void submit() throws IOException, InterruptedException, 
           ClassNotFoundException { 
    ensureState(JobState.DEFINE); 
    setUseNewAPI(); 

    // Connect to the JobTracker and submit the job 
    connect(); 
    info = jobClient.submitJobInternal(conf); 
    super.setJobID(info.getID()); 
    state = JobState.RUNNING; 
    } 

公共

RunningJob submitJobInternal(final JobConf job 
           ) throws FileNotFoundException, 
             ClassNotFoundException, 
             InterruptedException, 
             IOException 

チェックのソースコードは、grepcodeとともに、内部用ポスト下記をご参照ください。上記の例では

What is the difference between JobClient.java and JobSubmitter.java in hadoop2?

+0

バックトラックは私を助けました:)あなた –

-1

これは、Javaでのmapreduceスクリプトの書き込みの基本的な例です。 PythonやC++などの他の言語でmapreduce-streamingを使用することもできますが、Javaは自国語です。

機能MapReduceは、入力ファイル名、出力ファイル名、および実行時パラメータとしてenvironmentalsが確立されている場合Mainクラスから呼び出されます。

import java.io.IOException; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class WordCount { 

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
     String line = value.toString(); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     while (tokenizer.hasMoreTokens()) { 
     word.set(tokenizer.nextToken()); 
     output.collect(word, one); 
     } 
    } 
    } 

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
     sum += values.next().get(); 
     } 
     output.collect(key, new IntWritable(sum)); 
    } 
    } 

    public static void main(String[] args) throws Exception { 
    JobConf conf = new JobConf(WordCount.class); 
    conf.setJobName("wordcount"); 

    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(IntWritable.class); 

    conf.setMapperClass(Map.class); 
    conf.setCombinerClass(Reduce.class); 
    conf.setReducerClass(Reduce.class); 

    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

    JobClient.runJob(conf); 
    } 
} 

あなたは、これは完全にここでApacheのチュートリアルで説明さ見ることができますこの例ではhttps://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Example%3A+WordCount+v1.0

、マップ関数の値を整理し、キーと値のペアとしてなど<word, 1>それらを割り当て、低減機能にハンドオフするペアをソート。 reduce関数は、集計を実行します。

これは非常に長い習慣の始まりですが、マップの主な概念を生み出し、集約に必要なキーと値のペアを作成し、集約と応答を減らします。どちらもデータノードで実行されます。これは分散レプリケーションを提供することで処理速度が向上します。

これが役に立ちます。

+0

、マッパークラスのマップ方法は、このオーバーライドマップメソッドが呼び出されるところから地図class.Nowによってオーバーライドを取得し、メインから、我々はちょうどconf.setMapperClass(Map.class)を設定している。しかし、内部的にMapクラスのオブジェクトがどのようにマップメソッドをオーバーライドしたか?この多態性が内部的にどのように達成されるか説明できますか? –

+0

あなたは何を達成しようとしていますか?あなたは別の質問に答えていますか?または何かを達成しようとしていますか? – arcee123

関連する問題