JavaではMapReduceを使っていくつかのデータをインポートしましたtsvファイル(約21 * 10^6行)をHBaseテーブルに追加します。
すべての行がある:
XYZ | XZS YSY | SDS | XDA | JKX | SDS 0.XXXXXXXXX
HTable 5カラムファミリーた:A、B、C、D、E
HBaseへの一括読み込み:エラー:java.lang.ClassCastException:org.apache.hadoop.io.FloatWritableはorg.apache.hadoop.hbase.Cellにキャストできません
最初ファイルの恋人のカップルは私のHBaseのrowkeyです。 - | SDS | XDA | JKX | SDS | XDA | | JKX>コラム家族のため
- YSY:
5の第2のグループは5列の修飾子です - >コラムファミリーB用
- YSY | SDS | XDA- >列ファミリC
- YSY用| SDS - >列の家族のためにE
- >コラムファミリーD - YSY について
最後に挿入する値セルを横切る。 私はΣの合計値を同じ修飾子(1または2または3または4または5)で集計しました(これは私のReducerの一部です)。
これは私のドライバです:私のマッパーがある
public class Driver {
private static final String COLUMN_FAMILY_1 = "A";
private static final String COLUMN_FAMILY_2 = "B";
private static final String COLUMN_FAMILY_3 = "C";
private static final String COLUMN_FAMILY_4 = "D";
private static final String COLUMN_FAMILY_5 = "E";
private static final String TABLENAME = "abe:data";
private static final String DATA_SEPARATOR = "\t";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = HBaseConfiguration.create();
//Configuration Settings about hbase table
configuration.set("hbase.table.name", TABLENAME);
configuration.set("colomn_family_1", COLUMN_FAMILY_1);
configuration.set("colomn_family_2", COLUMN_FAMILY_2);
configuration.set("colomn_family_3", COLUMN_FAMILY_3);
configuration.set("colomn_family_4", COLUMN_FAMILY_4);
configuration.set("colomn_family_5", COLUMN_FAMILY_5);
configuration.set("data_separator", DATA_SEPARATOR);
if (args.length!= 2){
System.out.println("Usage: ");
System.out.println("-\t args[0] -> HDFS input path");
System.err.println("-\r args[1] -> HDFS output path ");
System.exit(1);
}
String inputPath = args[0];
String outputPath = args[1];
Path inputHdfsPath = new Path(inputPath);
Path outputHdfsPath = new Path(outputPath);
Job job = null;
try {
job = Job.getInstance(configuration);
} catch (IOException e) {
System.out.println("\n\t--->Exception: Error trying getinstance of job.<---\n");
e.printStackTrace();
}
job.setJobName("Bulk Loading HBase Table: "+ "\""+ TABLENAME+"\" with aggregation.");
job.setJarByClass(Driver.class);
//MAPPER
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MappingClass.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
try {
FileInputFormat.addInputPath(job, inputHdfsPath);
} catch (IllegalArgumentException | IOException e) {
System.out.println("Error setting inputPath in FileInputFormat");
e.printStackTrace();
}
try {
FileSystem.get(configuration).delete(outputHdfsPath, true);
} catch (IllegalArgumentException | IOException e) {
System.out.println("");
e.printStackTrace();
}
//Setting output FileSystem.Path to save HFile to bulkImport
FileOutputFormat.setOutputPath(job, outputHdfsPath);
FileSystem hdfs;
//Deleting output folder if exists
try {
hdfs = FileSystem.get(configuration);
if(hdfs.exists(outputHdfsPath)){
hdfs.delete(outputHdfsPath, true); //Delete existing Directory
}
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
//Variables to access to HBase
Connection hbCon = ConnectionFactory.createConnection(configuration);
Table hTable = hbCon.getTable(TableName.valueOf(TABLENAME));
RegionLocator regionLocator = hbCon.getRegionLocator(TableName.valueOf(TABLENAME));
Admin admin = hbCon.getAdmin();
HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator);
// Wait for HFiles creations
boolean result = job.waitForCompletion(true);
LoadIncrementalHFiles loadFfiles = null;
try {
loadFfiles = new LoadIncrementalHFiles(configuration);
} catch (Exception e) {
System.out.println("Error configuring LoadIncrementalHFiles.");
e.printStackTrace();
}
if (result){
loadFfiles.doBulkLoad(outputHdfsPath, admin, hTable, regionLocator);
System.out.println("Bulk Import Completed.");
}
else {
System.out.println("Error in completing job. No bulkimport.");
}
}
}
:
public class MappingClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,FloatWritable>{
private String separator;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
separator = configuration.get("data_separator");
}
@Override
public void map(LongWritable key,Text line,Context context){
String[] values = line.toString().split(separator);
String rowkey = values[0];
String[] allQualifiers = values[1].split("\\|");
String percentage = values[2];
System.out.println(percentage);
String toQ1 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]+"|"+allQualifiers[4]);
String toQ2= new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]);
String toQ3 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]);
String toQ4 = new String(allQualifiers[0]+"|"+allQualifiers[1]);
String toQ5 = new String(allQualifiers[0]);
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
FloatWritable valueOut = new FloatWritable(Float.parseFloat(percentage));
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ1)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ2)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ3)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ4)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ5)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
これは私のリデューサーです:
public class ReducingClass extends Reducer<ImmutableBytesWritable, FloatWritable, ImmutableBytesWritable, KeyValue> {
private String columnFamily_1;
private String columnFamily_2;
private String columnFamily_3;
private String columnFamily_4;
private String columnFamily_5;
private float sum;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
columnFamily_1 = configuration.get("colomn_family_1");
columnFamily_2 = configuration.get("colomn_family_2");
columnFamily_3 = configuration.get("colomn_family_3");
columnFamily_4 = configuration.get("colomn_family_4");
columnFamily_5 = configuration.get("colomn_family_5");
}
@Override
public void reduce(ImmutableBytesWritable key, Iterable<FloatWritable> values, Context context){
String[] rk_cq = key.toString().split("_");
String rowkey = rk_cq[0];
String cq = rk_cq[1];
String colFamily = this.getFamily(cq);
sum = 0;
for(FloatWritable fw : values)
sum += fw.get();
ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkey.getBytes());
KeyValue kv = new KeyValue(rowkey.getBytes(), colFamily.getBytes(), cq.getBytes(), Float.toString(sum).getBytes());;
try {
context.write(ibw, kv);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
private String getFamily(String cq){
String cf = new String();
switch (cq.split("\\|").length) {
case 1:
cf = columnFamily_1;
break;
case 2:
cf = columnFamily_2;
break;
case 3:
cf = columnFamily_3;
break;
case 4:
cf = columnFamily_4;
break;
case 5:
cf = columnFamily_5;
break;
default:
break;
}
return cf;
}
}
今エラー:任意の助け
17/05/08 20:04:22 INFO mapreduce.Job: map 100% reduce 29%
17/05/08 20:04:22 INFO mapreduce.Job: Task Id : attempt_1485334922305_5537_r_000000_2, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
感謝。