2017-02-24 4 views
0

マッパーは、() 2)国による国の統計をソートするユーザが訪れた二つの場所 1)の記事からファイルを読んでいる賢明(国)への書き込みテキスト、アマゾンクラスタHDFSから読むと</p> <p>は、両方のマッパーの出力はHBaseの

のテキスト

私が実行しているプログラムは、私の目的は、二つの異なるセットからのデータを読み取り、その結果を組み合わせて、HBaseの中に格納されます。

HDFS to HDFSが動作しています。 コードは67%の削減で立ち往生して

17/02/24 10:45:31 INFO mapreduce.Job: map 0% reduce 0% 
17/02/24 10:45:37 INFO mapreduce.Job: map 100% reduce 0% 
17/02/24 10:45:49 INFO mapreduce.Job: map 100% reduce 67% 
17/02/24 10:46:00 INFO mapreduce.Job: Task Id : attempt_1487926412544_0016_r_000000_0, Status : FAILED 
Error: java.lang.IllegalArgumentException: Row length is 0 
     at org.apache.hadoop.hbase.client.Mutation.checkRow(Mutation.java:565) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:110) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:68) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:58) 
     at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:45) 
     at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:1) 
     at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) 
     at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:635) 
     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390) 
     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.Subject.doAs(Subject.java:422) 
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) 
     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 

ドライバクラスがあるとエラーになりますなっている

package com.happiestminds.hadoop; 



import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.MasterNotRunningException; 
import org.apache.hadoop.hbase.client.HBaseAdmin; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 


public class Main extends Configured implements Tool { 

    /** 
    * @param args 
    * @throws Exception 
    */ 
    public static String outputTable = "mapreduceoutput"; 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new Main(), args); 
     System.exit(exitCode); 
    } 

    @Override 
    public int run(String[] args) throws Exception { 


     Configuration config = HBaseConfiguration.create(); 

     try{ 
      HBaseAdmin.checkHBaseAvailable(config); 
     } 
     catch(MasterNotRunningException e){ 
      System.out.println("Master not running"); 
      System.exit(1); 
     } 

     Job job = Job.getInstance(config, "Hbase Test"); 

     job.setJarByClass(Main.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 



     MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ArticleMapper.class); 
     MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, StatisticsMapper.class); 

     TableMapReduceUtil.addDependencyJars(job); 
     TableMapReduceUtil.initTableReducerJob(outputTable, CounterReducer.class, job); 

     //job.setReducerClass(CounterReducer.class); 

     job.setNumReduceTasks(1); 


     return job.waitForCompletion(true) ? 0 : 1; 
    } 

} 

リデューサークラスがある

package com.happiestminds.hadoop; 

import java.io.IOException; 

import org.apache.hadoop.hbase.client.Mutation; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapreduce.TableReducer; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 


public class CounterReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { 

    public static final byte[] CF = "counter".getBytes(); 
    public static final byte[] COUNT = "combined".getBytes(); 


    @Override 
    protected void reduce(Text key, Iterable<Text> values, 
      Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) 
      throws IOException, InterruptedException { 

     String vals = values.toString(); 
     int counter = 0; 

     StringBuilder sbr = new StringBuilder(); 
     System.out.println(key.toString()); 
     for (Text val : values) { 
      String stat = val.toString(); 
      if (stat.equals("***")) { 
       counter++; 
      } else { 
       sbr.append(stat + ","); 
      } 

     } 
     sbr.append("Article count : " + counter); 


     Put put = new Put(Bytes.toBytes(key.toString())); 
     put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString())); 
     if (counter != 0) { 
      context.write(null, put); 
     } 

    } 



} 

依存

<dependencies> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.7.3</version> 
     </dependency> 



     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-client</artifactId> 
      <version>1.2.2</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-common</artifactId> 
      <version>1.2.2</version> 
     </dependency> 


     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-server</artifactId> 
      <version>1.2.2</version> 
     </dependency> 



    </dependencies> 

答えて

0

null値を挿入しているかどうかを確認できますか?

HBaseデータモデルでは、ゼロ長の行キーを使用できません。少なくとも1バイトである必要があります。

還元コマンドコードをチェックしてから、値の一部がnullに設定されているかどうかにかかわらず、putコマンドを実行してください。

1

あなたの値をどこかに提出する前に検証することをお勧めします。特定のケースではキーsbrを検証するか、適切な通知ポリシーを使用してtry-catchセクションにラップしてください。

try 
{ 
    Put put = new Put(Bytes.toBytes(key.toString())); 
    put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString())); 
    if (counter != 0) { 
     context.write(null, put); 
    } 
} 
catch (IllegalArgumentException ex) 
{ 
     System.err.println("Error processing record - Key: "+ key.toString() +", values: " +sbr.ToString()); 
} 
0

あなたが得るエラーはかなり自明です:あなたは、いくつかのログに出力して、彼らは正しいものではなく、新しいテスト・ケースであなたのユニットテストを更新した場合にする必要があります。 HBaseの行キーは空にすることはできません(値は可能ですが)。

@Override 
protected void reduce(Text key, Iterable<Text> values, 
     Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) 
     throws IOException, InterruptedException { 
    if (key == null || key.getLength() == 0) { 
     // Log a warning about the empty key. 
     return; 
    } 
    // Rest of your reducer follows. 
} 
+0

レデューサーが100%で立ち往生しました。 –

1

キーの長さは、キーの長さが0かどうかだけあなたがHBaseのに入れることができている場合ので、HBaseのに入れる前に、あなたがチェックできる0であることは明らかであるプログラムによってスローされた例外によります。

鍵長の0は、それが少なくとも1バイトでなければならない長さ0の行キーを許可しないのHBase

Becuase HBaseのデータモデルによってサポートされていない理由をより明瞭。 0バイトの行キーは、内部使用(空の開始キーと終了キーを指定するため)用に予約されています。

関連する問題