2017-03-08 5 views
0

HDFSの大きなavroファイルを入力ファイルのフィールドの値に基づいて複数のavroファイルに分割しようとしています。以下のセクションでは、マッパー、レデューサー、ドライバープログラムについて説明します。 ...レデューサー出力のパーツファイルの名前付け

代わりに株式-R-00000.avroの

、株式を

は今、すべてが正常に動作しますが、出力ファイルは 01-R-00000.avro、02-R-00000.avroとして命名なっています-r-00001.avro

私は何が欠けていますか?

おかげ

マッパー:

public static class CustomFileSplitMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<String>, AvroValue<GenericRecord>> { 
    @Override 
    public void map(AvroKey<GenericRecord> key, NullWritable value, Context context) 
    throws IOException, InterruptedException { 
     GenericRecord record = key.datum(); 
     LOGGER.info(record); 
     AvroValue<GenericRecord> outValue = new AvroValue<GenericRecord>(record); 
     context.write(new AvroKey<String>((String) record.get("date")), outValue); 
    } 
    } 

リデューサー:

public static class CustomFileSplitReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> { 
    private AvroMultipleOutputs amos; 
    private String outputPath; 

    @Override 
    protected void setup(Context context) { 
     outputPath = context.getConfiguration().get("outputPath"); 
     amos = new AvroMultipleOutputs(context); 
    } 

    @Override 
    public void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) 
    throws IOException, InterruptedException { 
     for (AvroValue<GenericRecord> value : values) { 
     String datePath = "daily" + File.separator + LocalDate.parse(new String(key.datum().getBytes()), 
      DateTimeFormatter.ofPattern("yyyyMMdd")).format(DateTimeFormatter.ofPattern("yyyy/MM/dd")); 
     GenericRecord record = value.datum(); 
     amos.write("stock", new AvroKey<GenericRecord>(record), NullWritable.get(), 
     outputPath + File.separator + datePath); 
    } 
    } 
    @Override 
    public void cleanup(Context context) throws IOException, InterruptedException { 
    amos.close(); 
    } 
} 

ドライバ:あなたが多出力で一部のファイルに必要なプレフィックスを添付する必要があり

Configuration conf = new Configuration(); 
conf.set("outputPath", props.getString("outputPath")); 
Job job = Job.getInstance(conf, "CustomFileSplitter"); 
job.setJarByClass(CustomFileSplitter.class); 
job.setMapperClass(CustomFileSplitMapper.class); 
job.setReducerClass(CustomFileSplitReducer.class); 
FileInputFormat.addInputPath(job, new Path(props.getString("inputPath"))); 
FileOutputFormat.setOutputPath(job, new Path(props.getString("outputPath"))); 
job.setInputFormatClass(AvroKeyInputFormat.class); 
LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); 
job.setMapOutputKeyClass(AvroKey.class); 
job.setMapOutputValueClass(AvroValue.class); 
Schema schema = SchemaExtractor.extract(new Path(props.getString("inputPath"))); 

AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING)); 
AvroJob.setMapOutputValueSchema(job, schema); 
AvroJob.setOutputKeySchema(job, schema); 
AvroJob.setInputKeySchema(job, schema); 
AvroMultipleOutputs.addNamedOutput(job, "stock", AvroKeyOutputFormat.class, schema); 

答えて

関連する問題