2017-05-08 5 views
0

JavaではMapReduceを使っていくつかのデータをインポートしましたtsvファイル(約21 * 10^6行)をHBaseテーブルに追加します。
すべての行がある:
XYZ | XZS   YSY | SDS | XDA | JKX | SDS   0.XXXXXXXXX
HTable 5カラムファミリーた:A、B、C、D、E
HBaseへの一括読み込み:エラー:java.lang.ClassCastException:org.apache.hadoop.io.FloatWritableはorg.apache.hadoop.hbase.Cellにキャストできません

最初ファイルの恋人のカップルは私のHBaseのrowkeyです。 - | SDS | XDA | JKX   | SDS | XDA | | JKX>コラム家族のため

  • YSY SDS    

    1. YSY:

      5の第2のグループは5列の修飾子です      - >コラムファミリーB用

    2. YSY | SDS |          XDA- >列ファミリC
    3. YSY用| SDS                 - >列の家族のためにE
      - >コラムファミリーD
    4. YSY                    について

    最後に挿入する値セルを横切る。 私はΣの合計値を同じ修飾子(1または2または3または4または5)で集計しました(これは私のReducerの一部です)。

    これは私のドライバです:私のマッパーがある

    public class Driver { 
    
        private static final String COLUMN_FAMILY_1 = "A"; 
        private static final String COLUMN_FAMILY_2 = "B"; 
        private static final String COLUMN_FAMILY_3 = "C"; 
        private static final String COLUMN_FAMILY_4 = "D"; 
        private static final String COLUMN_FAMILY_5 = "E"; 
        private static final String TABLENAME = "abe:data"; 
        private static final String DATA_SEPARATOR = "\t"; 
    
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
         Configuration configuration = HBaseConfiguration.create(); 
    
         //Configuration Settings about hbase table 
         configuration.set("hbase.table.name", TABLENAME); 
         configuration.set("colomn_family_1", COLUMN_FAMILY_1); 
         configuration.set("colomn_family_2", COLUMN_FAMILY_2); 
         configuration.set("colomn_family_3", COLUMN_FAMILY_3); 
         configuration.set("colomn_family_4", COLUMN_FAMILY_4); 
         configuration.set("colomn_family_5", COLUMN_FAMILY_5); 
         configuration.set("data_separator", DATA_SEPARATOR); 
    
    
         if (args.length!= 2){ 
          System.out.println("Usage: "); 
          System.out.println("-\t args[0] -> HDFS input path"); 
          System.err.println("-\r args[1] -> HDFS output path "); 
          System.exit(1); 
         } 
    
         String inputPath = args[0]; 
         String outputPath = args[1]; 
         Path inputHdfsPath = new Path(inputPath); 
         Path outputHdfsPath = new Path(outputPath); 
    
         Job job = null; 
    
         try { 
          job = Job.getInstance(configuration); 
         } catch (IOException e) { 
          System.out.println("\n\t--->Exception: Error trying getinstance of job.<---\n"); 
          e.printStackTrace(); 
         } 
    
         job.setJobName("Bulk Loading HBase Table: "+ "\""+ TABLENAME+"\" with aggregation."); 
         job.setJarByClass(Driver.class); 
    
         //MAPPER 
         job.setInputFormatClass(TextInputFormat.class); 
         job.setMapperClass(MappingClass.class); 
         job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
         job.setMapOutputValueClass(FloatWritable.class); 
    
         try { 
    
          FileInputFormat.addInputPath(job, inputHdfsPath); 
    
         } catch (IllegalArgumentException | IOException e) { 
          System.out.println("Error setting inputPath in FileInputFormat"); 
          e.printStackTrace(); 
         } 
    
         try { 
    
          FileSystem.get(configuration).delete(outputHdfsPath, true); 
    
         } catch (IllegalArgumentException | IOException e) { 
          System.out.println(""); 
          e.printStackTrace(); 
         } 
    
         //Setting output FileSystem.Path to save HFile to bulkImport 
         FileOutputFormat.setOutputPath(job, outputHdfsPath);   
         FileSystem hdfs; 
    
    
         //Deleting output folder if exists 
         try { 
    
          hdfs = FileSystem.get(configuration); 
          if(hdfs.exists(outputHdfsPath)){ 
           hdfs.delete(outputHdfsPath, true); //Delete existing Directory 
          } 
    
         } catch (IllegalArgumentException | IOException e) { 
          e.printStackTrace(); 
         } 
    
    
         //Variables to access to HBase 
         Connection hbCon = ConnectionFactory.createConnection(configuration); 
         Table hTable = hbCon.getTable(TableName.valueOf(TABLENAME)); 
         RegionLocator regionLocator = hbCon.getRegionLocator(TableName.valueOf(TABLENAME)); 
         Admin admin = hbCon.getAdmin(); 
         HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator); 
    
         // Wait for HFiles creations 
         boolean result = job.waitForCompletion(true); 
         LoadIncrementalHFiles loadFfiles = null; 
    
         try { 
          loadFfiles = new LoadIncrementalHFiles(configuration); 
         } catch (Exception e) { 
          System.out.println("Error configuring LoadIncrementalHFiles."); 
          e.printStackTrace(); 
         } 
    
         if (result){ 
          loadFfiles.doBulkLoad(outputHdfsPath, admin, hTable, regionLocator); 
          System.out.println("Bulk Import Completed."); 
         } 
         else { 
          System.out.println("Error in completing job. No bulkimport."); 
         } 
    
        } 
    
        } 
    


    public class MappingClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,FloatWritable>{ 
         private String separator; 
    
    
         @Override 
         protected void setup(Context context) throws IOException, InterruptedException { 
          Configuration configuration = context.getConfiguration(); 
          separator = configuration.get("data_separator"); 
         } 
    
         @Override 
         public void map(LongWritable key,Text line,Context context){ 
    
          String[] values = line.toString().split(separator); 
          String rowkey = values[0]; 
          String[] allQualifiers = values[1].split("\\|"); 
          String percentage = values[2]; 
          System.out.println(percentage); 
    
          String toQ1 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]+"|"+allQualifiers[4]); 
          String toQ2= new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]); 
          String toQ3 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]); 
          String toQ4 = new String(allQualifiers[0]+"|"+allQualifiers[1]); 
          String toQ5 = new String(allQualifiers[0]); 
    
    
          ImmutableBytesWritable ibw = new ImmutableBytesWritable(); 
          FloatWritable valueOut = new FloatWritable(Float.parseFloat(percentage)); 
    
          ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ1))); 
    
          try { 
           context.write(ibw, valueOut); 
          } catch (IOException | InterruptedException e) { 
           e.printStackTrace(); 
          } 
    
    
          ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ2))); 
    
          try { 
           context.write(ibw, valueOut); 
          } catch (IOException | InterruptedException e) { 
           e.printStackTrace(); 
          } 
    
    
          ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ3))); 
    
          try { 
           context.write(ibw, valueOut); 
          } catch (IOException | InterruptedException e) { 
           e.printStackTrace(); 
          } 
    
    
          ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ4))); 
    
          try { 
           context.write(ibw, valueOut); 
          } catch (IOException | InterruptedException e) { 
           e.printStackTrace(); 
          } 
    
    
          ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ5))); 
    
          try { 
           context.write(ibw, valueOut); 
          } catch (IOException | InterruptedException e) { 
           e.printStackTrace(); 
          } 
    
         } 
    
        } 
    

    これは私のリデューサーです:

    public class ReducingClass extends Reducer<ImmutableBytesWritable, FloatWritable, ImmutableBytesWritable, KeyValue> { 
         private String columnFamily_1; 
         private String columnFamily_2; 
         private String columnFamily_3; 
         private String columnFamily_4; 
         private String columnFamily_5; 
         private float sum; 
    
         @Override 
         protected void setup(Context context) throws IOException, InterruptedException { 
          Configuration configuration = context.getConfiguration(); 
    
          columnFamily_1 = configuration.get("colomn_family_1"); 
          columnFamily_2 = configuration.get("colomn_family_2"); 
          columnFamily_3 = configuration.get("colomn_family_3"); 
          columnFamily_4 = configuration.get("colomn_family_4"); 
          columnFamily_5 = configuration.get("colomn_family_5"); 
         } 
         @Override 
         public void reduce(ImmutableBytesWritable key, Iterable<FloatWritable> values, Context context){ 
          String[] rk_cq = key.toString().split("_"); 
          String rowkey = rk_cq[0]; 
          String cq = rk_cq[1]; 
          String colFamily = this.getFamily(cq);   
          sum = 0; 
    
          for(FloatWritable fw : values) 
           sum += fw.get(); 
    
          ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkey.getBytes()); 
          KeyValue kv = new KeyValue(rowkey.getBytes(), colFamily.getBytes(), cq.getBytes(), Float.toString(sum).getBytes());; 
    
    
          try { 
           context.write(ibw, kv); 
          } catch (IOException | InterruptedException e) { 
           e.printStackTrace(); 
          } 
    
         } 
    
         private String getFamily(String cq){ 
          String cf = new String(); 
    
          switch (cq.split("\\|").length) { 
          case 1: 
           cf = columnFamily_1; 
           break; 
    
          case 2: 
           cf = columnFamily_2; 
           break; 
    
          case 3: 
           cf = columnFamily_3; 
           break; 
    
          case 4: 
           cf = columnFamily_4; 
           break; 
    
          case 5: 
           cf = columnFamily_5; 
           break; 
    
          default: 
           break; 
          } 
    
          return cf; 
         } 
    
        } 
    

    エラー:任意の助け

    17/05/08 20:04:22 INFO mapreduce.Job: map 100% reduce 29%
    17/05/08 20:04:22 INFO mapreduce.Job: Task Id : attempt_1485334922305_5537_r_000000_2, Status : FAILED
    Error: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell
     at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167)
     at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
     at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
     at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
     at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
     at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
     at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
     at java.security.AccessController.doPrivileged(Native Method)
     at javax.security.auth.Subject.doAs(Subject.java:422)
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

    感謝。

  • 答えて

    0

    私はそれを修正しました。ドライバーで私は忘れてしまった:

    job.setReducerClass(ReducingClass.class); 
    
    関連する問題