2016-06-15 3 views
1

レデューサーが67%に達すると、タイムアウト例外が発生します。これはソートフェーズ後で、フェーズを減らす前です。問題を解決するためにどのパラメータを調べるべきかを教えてください。Mapreduce - レデューサーが67%に達するまでのタイムアウト

16/06/15 16:58:13 INFO mapreduce.Job: map 100% reduce 0% 
16/06/15 16:58:23 INFO mapreduce.Job: map 100% reduce 24% 
16/06/15 16:59:05 INFO mapreduce.Job: map 100% reduce 28% 
16/06/15 16:59:08 INFO mapreduce.Job: map 100% reduce 30% 
16/06/15 16:59:39 INFO mapreduce.Job: map 100% reduce 33% 
16/06/15 17:00:09 INFO mapreduce.Job: map 100% reduce 52% 
16/06/15 17:00:12 INFO mapreduce.Job: map 100% reduce 67% 
16/06/15 17:05:42 INFO mapreduce.Job: Task Id : attempt_1465992294703_0001_r_000000_2, Status : FAILED 

ドライバクラス

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.CSVLineRecordReader; 
import org.apache.hadoop.mapreduce.lib.input.CSVNLineInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

public class ExchgLogsTransposeDriver extends Configured implements Tool { 


    public int run(String[] args) throws Exception { 
     @SuppressWarnings("deprecation") 
     Configuration conf = getConf(); 
     String outPath=null; 
     String inPath=null; 

     if(args==null ||args.length==0){ 
      inPath="C:\\HadoopWS\\infile\\"; 
      outPath="C:\\HadoopWS\\outfile\\"; 

     }else{ 
      inPath=args[0]; 
      outPath=args[1]; 


     } 


     Path output =new Path(outPath); 
     Path input =new Path(inPath); 

     FileSystem hdfs = FileSystem.get(conf); 
     if (hdfs.exists(output)) { 
      hdfs.delete(output, true); 
     } 
     conf.set(CSVLineRecordReader.FORMAT_DELIMITER, "\""); 
     conf.set(CSVLineRecordReader.FORMAT_SEPARATOR, ","); 
     conf.setInt(CSVNLineInputFormat.LINES_PER_MAP, 500000); 
     conf.setBoolean(CSVLineRecordReader.IS_ZIPFILE, false); 
     Job job = new Job(conf); 

     job.setJarByClass(ExchgLogsTransposeDriver.class); 
     job.setMapperClass(ExchgLogsMapper.class); 
     job.setMapOutputKeyClass(CompositeKey.class); 
     job.setMapOutputValueClass(CompositeWritable.class); 
//  job.setNumReduceTasks(2); 
     job.setMapSpeculativeExecution(true); 

     job.setPartitionerClass(ActualKeyPartitioner.class); 
     job.setGroupingComparatorClass(ActualKeyGroupingComparator.class); 
     job.setSortComparatorClass(CompositeKeyComparator.class); 
     job.setReducerClass(ExchgLogsReducer.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(CompositeWritable.class); 

     job.getConfiguration().set("mapreduce.output.basename", input.getName()); 
     job.getConfiguration().set("mapreduce.map.output.compress", "true"); 
//  job.getConfiguration().set("mapreduce.map.output.compress.codec", "com.hadoop.compression.lzo.LzoCodec"); 


     job.setInputFormatClass(CSVNLineInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 
     FileInputFormat.setInputDirRecursive(job, true); 
     FileInputFormat.addInputPath(job, new Path(inPath)); 
     FileOutputFormat.setOutputPath(job, new Path(outPath)); 

     return job.waitForCompletion(true) ? 0 : 1; 
    } 

    public static void main(String args[]) throws Exception { 
     System.exit(ToolRunner.run(new ExchgLogsTransposeDriver(), args)); 
    } 
} 

リデューサークラスは

import java.io.IOException; 
import java.text.ParseException; 
import java.text.SimpleDateFormat; 
import java.util.ArrayList; 
import java.util.Calendar; 
import java.util.Date; 
import java.util.Iterator; 
import java.util.List; 
import java.util.concurrent.TimeUnit; 

import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

public class ExchgLogsReducer extends Reducer<CompositeKey, CompositeWritable, NullWritable, Text> { 
    Log log = LogFactory.getLog(ExchgLogsReducer.class); 
    public static final String NEW = "NEW"; 
    public static final String FW = "FW"; 
    public static final String RE = "RE"; 
    public static final int ZERO = 0; 
    Text res = new Text(); 

    @Override 
    public void reduce(CompositeKey key, Iterable<CompositeWritable> value, Context context) 
      throws IOException, InterruptedException { 
     List<CompositeValueObj> cache = new ArrayList<CompositeValueObj>(); 

     StringBuilder response = new StringBuilder(); 
     Iterator<CompositeWritable> it = value.iterator(); 
     while (it.hasNext()) { 
      CompositeWritable currWritable = new CompositeWritable(); 
      currWritable = it.next(); 
      CompositeValueObj obj = new CompositeValueObj(); 
      obj.setRecepient((currWritable.getRecepient().toString())); 
      obj.setSender(currWritable.getSender().toString()); 
      obj.setType(currWritable.getType().toString()); 
      obj.setTimestamp(currWritable.getTimestamp().toString()); 
      cache.add(obj); 
      // System.out.println(new Text(" "+"\t" + obj.getRecepient() + "\t" 
      // + obj.getSender() + "\t" +obj.getType()+ "\t" + 
      // obj.getTimestamp())); 

     } 

     for (int i = 0; i < cache.size(); i++) { 
      CompositeValueObj currobj = cache.get(i); 
      String receiver = currobj.getRecepient().toString(); 
      String origSender = currobj.getSender().toString(); 

      String dateFrom = currobj.getTimestamp().toString(); 
      System.out.println(key.getSubject() + " " + "i==>" + i + cache.size()); 
      for (int j = i + 1; j < cache.size(); j++) { 
       response = new StringBuilder(key.getSubject()).append(",").append(receiver).append(","); 
       CompositeValueObj nextObj = cache.get(j); 
       System.out.println(key.getSubject() + " " + "j==>" + j); 

       String dateTo = nextObj.getTimestamp().toString(); 
       String newSender = nextObj.getSender().toString(); 
       String newRecepient = nextObj.getRecepient().toString(); 
       String mailType = nextObj.getType().toString(); 

       // System.out.println(mailType+ "==>"+receiver+ 
       // "==>"+newRecepient); 

       if (receiver.equals(newRecepient)) { 
        response.append(origSender).append(","); 
        response.append("N,0,0,").append(dateFrom); 
        break; 
       } 

       if (receiver.equals(newSender) && ((mailType.equals(RE) || (mailType.equals(FW))))) { 
        if (mailType.equals(RE)) { 
         response.append(origSender).append(","); 
         response.append("Y,"); 
         response.append(getTimeDiff(dateFrom, dateTo)); 
         response.append(",0,").append(dateTo); 
         break; 
        } 

        if (mailType.equals(FW)) { 
         response.append(origSender).append(","); 
         response.append("Y,0,"); 
         response.append(getTimeDiff(dateFrom, dateTo)); 
         response.append(",").append(dateTo); 
         break; 
        } 

       } else { 
        response.append(origSender).append(","); 
        response.append("N,0,0,").append(dateFrom); 
       } 

      } 
      if (i + 1 == cache.size()) { 
       response = new StringBuilder(key.getSubject()).append(",").append(receiver).append(","); 
       response.append(origSender).append(","); 
       response.append("N,0,0,").append(dateFrom); 

      } 
      res.set(response.toString()); 
      // System.err.println(key.getSubject()+new 
      // Text(response.toString())); 
      context.write(NullWritable.get(), res); 
     } 

    } 

    private static double getTimeDiff(String date1, String date2) { 
     double diff = 0; 
     double weekend = 0; 
     boolean isWEchain=false; 
     boolean isWESent=false; 

     if (date1 == null || date2 == null) { 
      return 0; 
     } 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); 
     try { 
      Date from = sdf.parse(date1); 
      Date to = sdf.parse(date2); 
      Calendar cal1 = Calendar.getInstance(); 
      Calendar cal2 = Calendar.getInstance(); 
      cal1.setTime(from); 
      cal2.setTime(to); 
      int noOfDaysWE = 0; 
      System.out.println(cal1.get(Calendar.DAY_OF_WEEK)); 
      System.out.println(cal2.get(Calendar.DAY_OF_WEEK)); 
      if ((((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK)) 
        || (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK))) 
        && ((Calendar.FRIDAY == cal2.get(Calendar.DAY_OF_WEEK)) 
          || (Calendar.SATURDAY == cal2.get(Calendar.DAY_OF_WEEK)))) 
        ) { 

       isWEchain =true; 
      }else if((((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK)) 
        || (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK))) 
        && (((Calendar.FRIDAY != cal2.get(Calendar.DAY_OF_WEEK)) 
          && (Calendar.SATURDAY != cal2.get(Calendar.DAY_OF_WEEK)))))){ 
       isWESent=true; 
       if(Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK)){ 
        cal1.add(Calendar.DATE, 1); 

       } 
       cal1.set(Calendar.HOUR,20); 
       cal1.set(Calendar.MINUTE,0); 
       cal1.set(Calendar.SECOND,0); 
       cal1.set(Calendar.MILLISECOND,0); 
      } 
      System.out.println(cal1.getTime()); 
      System.out.println(cal2.getTime()); 
      System.out.println(isWESent); 
      diff=cal2.getTimeInMillis() - cal1.getTimeInMillis(); 
      if(diff < 0){ 
      return 0; 
      } 

      while (cal1.before(cal2)) { 
       if ((Calendar.FRIDAY == cal1.get(Calendar.DAY_OF_WEEK)) 
         || (Calendar.SATURDAY == cal1.get(Calendar.DAY_OF_WEEK))) { 
        noOfDaysWE++; 
       } 
       cal1.add(Calendar.DATE, 1); 
      } 




      if (noOfDaysWE != 0) { 
       weekend = TimeUnit.DAYS.toMillis(noOfDaysWE); 
      } 
      if(isWEchain && (noOfDaysWE <= 2)){ 
       return 0; 

      } 

      System.out.println(diff); 
      diff = diff - weekend; 
     } catch (ParseException e) { 
      return 0; 
     } 

     if (diff != 0) 
      return diff/1000; 
     else 
      return 0; 
    } 

    public static void main(String[] a) { 

     System.out.println(getTimeDiff("2016-06-03T19:41:48.781Z", "2016-06-05T07:21:01.000Z")); 
    } 

} 
+0

あなたはいくつのレデューサーを設定しましたか? – cyberpirate92

+1

今まで何を試してみましたか?また、いくつかのコードスニペットを入れてください。レジューサーコードだけにいくつかの問題があるように見えます。 –

+0

@cyberPheonixデフォルトでは、1つの減速機のみが作成されます。 –

答えて

1

多くのログを書き込んでいたループ内の減速コードには、いくつかのsysoutsがありませんでした。それらを削除した後、減速機は数分で終了します。

1

mapred-site.xmlでミリ秒単位である中mapred.task.timeoutに見てください。

プロパティを変更した後、あなたはすべてのtranckersを再起動する必要があります(ジョブ+タスク)

ヒント:あなたは、実行時にすべての設定を印刷したい場合に適用されるかどうかを確認するかのコードスニペットの下にこれを使用しないことドライバ。 EX用 :

final JobConf conf = new JobConf(config, this.getClass()); 

try { 
      conf.writeXml(System.out); 
     } catch (final IOException e) { 
      e.printStackTrace(); 
     } 
+0

..あなたの入力に感謝します。このパラメータは私たちの場合300000です。ドキュメントごとに、「入力を読み取ったり、出力を書き込んだり、ステータス文字列を更新したりしなければ、タスクが終了するまでのミリ秒数。だから私はクラスターやコードで問題がある場合はそれが修正を必要とするか、私のタイムアウトは、変更する必要がある最適な値ではないことを疑う。 –

+0

時間を600000に増やそうとしましたが、600秒後にもう一度タイムアウトします。 –

+0

あなたはあなたのレデューサーがやっていることのコードスニペットを与えることができますか?上記のようにすべての設定を印刷しましたか?もしそうなら、あなたはそれのために変更された価値を得ることができますか? –

1

% 67から-100%削減、実際のコードが実行されますので、あなたの削減タスクのどれもが完了していない得ています。減速機に行くデータが多すぎるかもしれないし、ある種の無限ループがあるかもしれません。

+0

サイズが3GBの比較的小さいファイルで実行すると、レデューサーが正常に完成します。だから、無限ループがないかもしれない。減速機の数を増やすことは役に立ちます。しかし、ここで議論されているように減速機の数を増やすといくつかの例外に直面しています。 http://stackoverflow.com/questions/37773114/mapreduce-job-taking-to-long-to-complete –

関連する問題