特定のリデューサー出力の中にさらに多くの出力ファイルを生成する必要があります。 出力データをそれぞれのレデューサーに送るカスタムパーサーを実装しました。しかし、私のレデューサーの中には20GBのデータがあり、15MBのデータしかないものもあります。 1つのレデューサーのように5つの小さな出力ファイルがあるため、レデューサー段階でのデータ処理が高速になります。Custom Partionerのリデューサーあたりの出力ファイル数を設定する方法
私はgoogledと私は私の問題のためにMultiOutputを使用する必要があることがわかりました。しかし、私は使用するために混乱しています。 いくつかの実装を提案してください。
私はHBaseからデータを読み込んでテキストファイルに書き込みます。ここではここ
私のドライバのコードがある
Job job = new Job(hbaseConf);
job.setJarByClass(HBaseToFileDriver.class);
job.setJobName("Importing Data from HBase to File:::" + args[0]);
Scan scan = new Scan();
scan.setFilter(new RowFilter(CompareOp.EQUAL, new SubstringComparator("Japan")));
scan.setCaching(10000); // 1 is the default in Scan, which will be bad
// for
// MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addFamily(Bytes.toBytes("cf"));
TableMapReduceUtil.initTableMapperJob(args[0], // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
job.setReducerClass(MyReducer.class); // reducer class
job.setPartitionerClass(MyPartioner.class);
job.setNumReduceTasks(6); // at least one, adjust as required
//job.setInt("outputs.per.reducer", 4);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
は私のマッパーコード
public class MyMapper extends TableMapper<Text, IntWritable> {
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context)
throws IOException, InterruptedException {
String FundamentalSeriesId = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("FundamentalSeriesId")));
String FundamentalSeriesId_objectTypeId = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("FundamentalSeriesId_objectTypeId")));
text.set(FundamentalSeriesId+"|^|"+FundamentalSeriesId_objectTypeId+"|!|");
context.write(text, ONE);
}
}
これは、あなただけの6レデューサーを発するようにしたい場合は、私のpartioner
public class MyPartioner extends Partitioner<Text, IntWritable> {
public int getPartition(Text key, IntWritable value, int setNumRedTask) {
String str = key.toString();
if (str.contains("Japan|2014")) {
return 0;
} else if (str.contains("Japan|2013")) {
return 1;
} else if (str.contains("Japan|2012")) {
return 2;
} else if (str.contains("Japan|2011")) {
return 3;
} else if (str.contains("Japan|2010")) {
return 4;
}
return 5;
}
}