2016-12-07 10 views
0

私は、日付とポイント値をヘッダとして持つ200,000行のinput.csvファイル(25行のサンプルデータの要点:https://gist.githubusercontent.com/PatMulvihill/63effd90411efe858330b54a4111fadb/raw/4033695ba5ca2f439cfd1512358425643807d83b/input.csv)を取ることを目的とした3ノードのHadoop mapreduceの問題に取り組んでいます。プログラムは、次の値ではない任意のポイント値を見つける必要があります:200, 400, 600, 800, 1000, 1200, 1600, or 2000。そのポイント値は値でなければなりません。キーは、そのポイント値の前の値の日付からの年でなければなりません。たとえば、データがある場合 2000-05-25,400 2001-10-12, 650 2001-04-09, 700 還元剤に送る必要があるキーと値のペアは、<2001, 650><2001, 700>です。レデューサーは、各年度のすべての値の平均を取って、指定したhdfs /outパスにそれらのキーと値のペアを書き込む必要があります。プログラムはうまくコンパイルされますが、実際に何も出力することはありません。なぜそれを解決するために私ができることを知りたい。次のように 完全なコードは次のとおりです。Hadoop map-reducerは出力を書きません

import java.io.IOException; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class JeopardyMR { 

public static class SplitterMapper extends Mapper <Object, Text, Text, IntWritable> { 

    public void map (Object key, Text value, Context context) throws IOException, InterruptedException { 
     // Convert the CSVString (of type Text) to a string 
     String CSVString = value.toString(); 
     // Split the string at each comma, creating an ArrayList with the different attributes in each index. 
     // Sometimes the questions will be split into multiple elements because they contain commas, but for the 
     // way we will be parsing the CSV's, it doesn't matter. 
     List<String> items = Arrays.asList(CSVString.split("\\s*,\\s*")); 
     // Loop through all the elements in the CSV 
     // Start i at 3 to ensure that you do not parse a point value that has a year absent from the data set. 
     // We can end the loop at items.size() w/o truncating the last 3 items because if we have a point value, we know 
     // that the corresponding year is in the items before it, not after it. 
     // We will miss 1 or 2 data points because of this, but it shouldn't matter too much because of the magnitude of our data set 
     // and the fact that a value has a low probability of actually being a daily double wager. 
     for (int i = 3; i < items.size(); i++) { 
      // We want a String version of the item that is being evaluated so that we can see if it matches the regex 
      String item = items.get(i); 
      if (item.matches("^\\d{4}\\-(0?[1-9]|1[012])\\-(0?[1-9]|[12][0-9]|3[01])$")) { 
       // Make sure that we don't get an out of bounds error when trying to access the next item 
       if (i + 1 >= items.size()) { 
        break; 
       } else { 
        // the wagerStr should always be the item after a valid air date 
        String wagerStr = items.get(i + 1); 
        int wager = Integer.parseInt(wagerStr); 
        // if a wager isn't the following values, assume that is a daily double wager 
        if (wager != 200 && wager != 400 && wager != 600 && wager != 800 && wager != 1000 && wager != 1200 && wager != 1600 && wager != 2000) { 
         // if we know that a point value of a question is in fact a daily double wager, find the year that the daily double happened 
         // the year will always be the first 4 digits of a valid date formatted YYYY-MM-DD 
         char[] airDateChars = item.toCharArray(); 
         String year = "" + airDateChars[0] + airDateChars[1] + airDateChars[2] + airDateChars[3]; 

         // output the follow key-value pair: <year, wager> 
         context.write(new Text(year), new IntWritable(wager)); 
        } 
       } 

      } 
     } 
    } 
} 

public static class IntSumReducer extends Reducer <Text, IntWritable, Text, IntWritable> { 

    private IntWritable result = new IntWritable(); 
    public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
     int sum = 0, count = 0; 
     for (IntWritable val : values) { 
      sum += val.get(); 
      count++; 
     } 
     int avg = sum/count; 
     result.set(avg); 
     context.write(key, result); 
    } 
} 

public static void main (String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Job job = Job.getInstance(conf, "jeopardy daily double wagers by year"); 
    job.setJarByClass(JeopardyMR.class); 
    job.setMapperClass(SplitterMapper.class); 
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

成功したコンパイル端子出力がここで見つけることができます:https://gist.github.com/PatMulvihill/40b3207fe8af8de0b91afde61305b187 私はHadoopのマップ-削減に非常に新しいです、と私はおそらく非常に愚かな間違いを犯しています。私はここにあるコードからこのコードに基づいています:https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html 有用な情報が欠落している場合はお知らせください。どんな助けもありがとう!ありがとうございました。

+2

データを書き込むかどうかを確認するために、例を簡略化して(フィルタリングを行わない)試してみてください。 – Serhiy

+0

問題がどこにあるかを確認するには、何かを読んでいないのですか?マップの入力レコードカウンターを確認してください。ゼロの場合は、入力パスを確認してください。あなたはマッパーから何も書いていないのでしょうか?マップ出力レコードカウンターを確認してください。パターンマッチをチェックして、それが印刷されているかどうかを確認するifの中に何かを印刷してみてください。 – vefthym

+0

@vefthymマップ入力レコードカウンタをチェックしましたが、それは予想される入力レコードの量に等しいです。入力ファイルをフォーマットしてhdfsに再追加することで、入力パスが正しいことを確認しました。私の問題は、マッパーから減速機に何も書き込まないということです。マップの出力レコードカウンタは、書き込まれたバイトと同様に0です。私のプログラムが正常にコンパイルされたときのターミナルメッセージの出力は次のとおりです。https://gist.github.com/PatMulvihill/40b3207fe8af8de0b91afde61305b187 –

答えて

1

items.size()が2であることを確認してください。マップの入力はファイルの行であり、各行のマップ実行されたマップ機能をマップしています。各行がセミコロンで分割されると、アイテムのサイズは2になり、次にアイテムのサイズが3より大きいときに実行されます。 マップ出力の書き込みバイトをチェックして、データの書き込みを確認できます。 EDIT: これでマップコードを置き換える:

public void map (Object key, Text value, Context context) throws IOException, InterruptedException { 
     String CSVString = value.toString(); 
     String[] yearsValue = CSVString.split("\\s*,\\s*"); 
     if(yearsValue.length == 2){ 
      int wager = Integer.parseInt(yearsValue[1]); 
      if (wager != 200 && wager != 400 && wager != 600 && wager != 800 && wager != 1000 && wager != 1200 && wager != 1600 && wager != 2000) { 
       char[] airDateChars = yearsValue[0].toCharArray(); 
       String year = "" + airDateChars[0] + airDateChars[1] + airDateChars[2] + airDateChars[3]; 
       context.write(new Text(year), new IntWritable(wager)); 

      } 
     }else{ 
      System.out.println(CSVString); 
     } 
} 
+0

ありがとう@vahid。私は私の "not-an-answer"旗とdownvoteを引っ込めました。しかし、あなたの答えはまだ間違っています。なぜなら、この場合のコンバイナの使用は正しいだけでなく、非常に良い方法です。この部分を削除することを検討してください。 – vefthym

+0

@vahid 'items.size()'が2つしかない理由は何ですか?私が理解することは、 'values'パラメータをStringにキャストしているときに、その特定のノードに与えられたすべての入力を含む1つの巨大なStringを作る際に、すべての値を取っていることです。私は何かを誤解していますか?また、 'items.size()'が2であることをどうやって判断しますか?私のプリントは、hadoopアプリケーションでは動作しません。そして、彼らは、彼らが想定しているログファイルに終わることはありません。 –

+0

@Pat Mulvihillマップのための2つの概念があります:1-マップタスク2-マップ関数。構成のMapreduceベースは入力データを分割してマップタスクに渡し、タスク分割入力データベースを新しいラインにマップし、各ラインにマップ機能を与えます。あなたはマップのタスクではないマップ関数を書く。 – vahid

0

私は実際に.txtファイルに私の.csvファイルを変換することによって、この問題を修正しました。これは実際のという問題の解決策ではありませんが、それが私のコードを動作させたのですが、今はそれがなぜ問題なのかを理解できるようになりました。さらに、これは将来誰かを助けるかもしれません!

関連する問題