2016-04-18 16 views
0

2つのマッパーを作成しました。Map1とMap2複数のデータソースを持つ2つのマター

Map1- HDFSでseqファイルを読み取り、処理します。

Map2-はHBASEから読み取り、Map1と同じキー、値ペアを生成します。

最後に、それらをReducerAllでマージします。

問題は、マッパーのうち1つだけが実行されていて、ジョブがあらゆる種類のエラーで完了していないことです。最後のマッパーのみが実行されています(例:TableMapReduceUtil)。私が回線TableMapReduceUtilMultipleInputsを交換すると、最後のもの、すなわちMultipleInputsマッパーが実行されます。

私はここで間違っていますか?両方のシナリオでエラーはスローされません。私はまた、処理のためにaddCacheFile()を使用して2つのファイルを読んだが、ここでは関係ないと思う。

私は現象は、次の2行であると信じて
Job job3 = Job.getInstance(config, "Test"); 
if (true) { 


    job3.setJarByClass(Main.class); 


    job3.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job3.setMapOutputValueClass(ImmutableBytesWritable.class); 
    job3.setOutputKeyClass(ImmutableBytesWritable.class); 
    job3.setOutputValueClass(ImmutableBytesWritable.class); 


    job3.getConfiguration().set("StartDate", c_startDate); 
    job3.getConfiguration().set("EndDate", c_endDate); 


    job3.addCacheFile(new URI(args[8])); 
    job3.getConfiguration().set("abc", args[8].substring(args[8].lastIndexOf("/") + 1)); 

    job3.addCacheFile(new URI(args[9])); 
    job3.getConfiguration().set("xyz", args[9].substring(args[9].lastIndexOf("/") + 1)); 
    job3.setReducerClass(ReducerAll.class); 
    job3.setOutputFormatClass(SequenceFileOutputFormat.class); 

    job3.setNumReduceTasks(10); 

    Scan scan = new Scan(); 
    scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("hbasetable")); 
    scan.setCaching(300); 
    scan.setCacheBlocks(false); 

    MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class); 
    TableMapReduceUtil.initTableMapperJob(
      "hbasetable", 
      scan, 
      Map2.class, 
      ImmutableBytesWritable.class, 
      ImmutableBytesWritable.class, 
      job3); 


    FileOutputFormat.setOutputPath(job3, new Path(args[7])); 
    job3.waitForCompletion(true); 
    if (!job3.waitForCompletion(true)) { 
    return (1); 
    } 

答えて

0

: -

MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class); 
TableMapReduceUtil.initTableMapperJob(
     "hbasetable", 
     scan, 
     Map2.class, 
     ImmutableBytesWritable.class, 
     ImmutableBytesWritable.class, 
     job3); 
  1. あなたは2つのマッパーがあることに言及しているが1つのジョブだけJOB3
  2. があり、マッパーの種類を見てください。 Map1のマッパーは、と異なるからMap2になります。 Map1Mapperあり、そしてMap2は、それらが本質的JOB3ためMultipleInputs セットアップにこん棒されていることを意味するものではありません一緒に二つの文を維持TableMapper
  3. です。 MultipleInputsには、のMap1が1つしか設定されていません。 Map2のもう1つの設定はまだ別です。
  4. 実行のために今すぐ。二つの構成のMultipleInputsまたはTableMapReduceUtilの後者は、JOB3の前のものを上書きし、したがって、単一のマッパーは

PSを実行します - 私は私の理解を検証するために、まだだ、これは間違っている場合は私に知らせてください私はここに私のマシンで発表しました。

+0

Ah!今、私は分かる。だから、私は両方のマッパーを実行し、同じマッパーに指示したい場合は、別の仕事を定義する必要がありますか?しかし、それでは、私は同じ還元剤にどのように与えることができますか? – gambit

+0

それを同じ還元剤に送る* – gambit

+0

@ガンビット私たちは3つの仕事を使うことができると思います。マッパーだけを持つ最初の2つのジョブと、実際に入力を「そのまま」出力し、次にこれらのファイルに対して実際にマージ・タスクを実行するレデューサーを作成するプレーン・マッパーを使用する3番目のジョブ(マージ・ステップ)。私はまだ良い方法でそれを行う方法を考えています(より良いパフォーマンスを提供します)。これが役に立ったら教えてください。私が何か良いものを見つけたら投稿します。 – Amal

関連する問題