2016-10-15 11 views
0

ファイルセット内のフレーズを検索するためにhadoopでmapreduceプログラムを作成しています。私はmapreduceの2つの仕事の連鎖を使用しています。しかし問題は、2番目のジョブが終了していないことです。以下に示すように、「減速タスク実行者完了」を表示しています。しかし、それは終了されていません。mapreduceジョブのチェーン内の2番目のジョブがHadoopで終了していません

16/10/15 19:04:16 INFO mapred.Task: Task:attempt_local1574338353_0002_r_000000_0 is done. And is in the process of committing 
16/10/15 19:04:16 INFO mapred.LocalJobRunner: 1/1 copied. 
16/10/15 19:04:16 INFO mapred.Task: Task attempt_local1574338353_0002_r_000000_0 is allowed to commit now 
16/10/15 19:04:16 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1574338353_0002_r_000000_0' to hdfs://localhost:54310/gopal/output/_temporary/0/task_local1574338353_0002_r_000000 
16/10/15 19:04:16 INFO mapred.LocalJobRunner: reduce > reduce 
16/10/15 19:04:16 INFO mapred.Task: Task 'attempt_local1574338353_0002_r_000000_0' done. 
16/10/15 19:04:16 INFO mapred.LocalJobRunner: Finishing task: attempt_local1574338353_0002_r_000000_0 
16/10/15 19:04:16 INFO mapred.LocalJobRunner: reduce task executor complete. 

ここは私のコードです。

public int run(String[] arg0) throws Exception 
{ 

    Configuration conf = getConf(); 
    conf.set("mapred.textoutputformat.separator", ";"); 
    System.out.println("********* First Job Started **********"); 

    JobConf job = new JobConf(conf, WordCount.class);   


    Path in = new Path(arg0[0]); 
    Path out = new Path("/gopal/temp"); 


    FileInputFormat.addInputPath(job, in); 
    FileOutputFormat.setOutputPath(job, out); 

    job.setJobName("Inverted Index");  

    job.setMapperClass(TokenizerMapper.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(MyPair.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(MyArrayWritable.class); 

    String phrase = ""; 
    phrase = phrase + "Sachin " + "Tendulkar"; 

    JobConf job2 = null; 

    System.out.println("Time taken by first map task: "+elapsedTime1); 
    System.out.println("Time taken by first reduce task: "+elapsedTime2); 

    System.out.println("********* Second Job Started **********"); 

    Configuration conf2 = getConf(); 

    job2 = new JobConf(conf2, WordCount.class); 

    Path out2 = new Path(arg0[1]); 

    FileInputFormat.addInputPath(job2, out); 
    FileOutputFormat.setOutputPath(job2, out2); 

    job2.setJobName("SearchQuery Mapper"); 

    job2.set("PhraseSearch", phrase); 


    job2.setMapperClass(QuerySearchMapper.class); 
    job2.setReducerClass(QuerySearchReducer.class); 

    job2.setMapOutputKeyClass(Text.class); 
    job2.setMapOutputValueClass(MyArrayWritable.class); 

    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(MyArrayWritable.class); 

    Job j1 = new Job(job); 
    Job j2 = new Job(job2); 

    JobControl jbcntrl = new JobControl ("jbcntrl"); 

    jbcntrl.addJob(j1); 
    jbcntrl.addJob(j2); 

    j2.addDependingJob(j1); 
    jbcntrl.run();   

    System.out.println("********* Second Job Completed**********");  

    return 0; 
} 

public static void main(String[] args) throws Exception 
{ 
    if (args.length != 2) 
    { 
     System.err.println("Enter valid number of arguments <Inputdirectory> <Outputlocation>"); 
     System.exit(0); 
    } 

    int res = ToolRunner.run(new Configuration(), new WordCount(), args); 
    System.exit(res); 
} 

を完了メッセージ第二の仕事を印刷取得されていません。

答えて

0

私は同じ問題に直面しました。私はこの問題を解決するために以下のコードを使用しました。これを使用して、ジョブ制御の待ち時間を追加します。その時間が経過するのを待って終了します。

public class JobHandler { 

    public static void handleRun(JobControl control) { 
     JobRunner runner = new JobRunner(control); 
     Thread t = new Thread(runner); 
     t.start(); 

     while (!control.allFinished()) { 
      System.out.println("Still running..."); 
      try { 
       Thread.sleep(5000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

} 


JobHandler.handleRun(jobControl); 
関連する問題