2016-12-21 5 views
0

特定のリデューサー出力の中にさらに多くの出力ファイルを生成する必要があります。 出力データをそれぞれのレデューサーに送るカスタムパーサーを実装しました。しかし、私のレデューサーの中には20GBのデータがあり、15MBのデータしかないものもあります。 1つのレデューサーのように5つの小さな出力ファイルがあるため、レデューサー段階でのデータ処理が高速になります。Custom Partionerのリデューサーあたりの出力ファイル数を設定する方法

私はgoogledと私は私の問題のためにMultiOutputを使用する必要があることがわかりました。しかし、私は使用するために混乱しています。 いくつかの実装を提案してください。

私はHBaseからデータを読み込んでテキストファイルに書き込みます。ここではここ

私のドライバのコードがある

Job job = new Job(hbaseConf); 
    job.setJarByClass(HBaseToFileDriver.class); 
    job.setJobName("Importing Data from HBase to File:::" + args[0]); 

    Scan scan = new Scan(); 
    scan.setFilter(new RowFilter(CompareOp.EQUAL, new SubstringComparator("Japan"))); 
    scan.setCaching(10000); // 1 is the default in Scan, which will be bad 
       // for 
       // MapReduce jobs 
    scan.setCacheBlocks(false); // don't set to true for MR jobs 
    scan.addFamily(Bytes.toBytes("cf")); 

    TableMapReduceUtil.initTableMapperJob(args[0], // input table 
     scan, // Scan instance to control CF and attribute selection 
     MyMapper.class, // mapper class 
     Text.class, // mapper output key 
     IntWritable.class, // mapper output value 
     job); 
    job.setReducerClass(MyReducer.class); // reducer class 
    job.setPartitionerClass(MyPartioner.class); 
    job.setNumReduceTasks(6); // at least one, adjust as required 
    //job.setInt("outputs.per.reducer", 4); 

    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

は私のマッパーコード

public class MyMapper extends TableMapper<Text, IntWritable> { 

    private final IntWritable ONE = new IntWritable(1); 
    private Text text = new Text(); 



    public void map(ImmutableBytesWritable row, Result value, Context context) 
     throws IOException, InterruptedException { 

    String FundamentalSeriesId = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("FundamentalSeriesId"))); 
    String FundamentalSeriesId_objectTypeId = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("FundamentalSeriesId_objectTypeId"))); 

    text.set(FundamentalSeriesId+"|^|"+FundamentalSeriesId_objectTypeId+"|!|"); 

    context.write(text, ONE); 
    } 
} 

これは、あなただけの6レデューサーを発するようにしたい場合は、私のpartioner

public class MyPartioner extends Partitioner<Text, IntWritable> { 

    public int getPartition(Text key, IntWritable value, int setNumRedTask) { 

    String str = key.toString(); 
    if (str.contains("Japan|2014")) { 
     return 0; 
    } else if (str.contains("Japan|2013")) { 
     return 1; 
    } else if (str.contains("Japan|2012")) { 
     return 2; 
    } else if (str.contains("Japan|2011")) { 
     return 3; 
    } else if (str.contains("Japan|2010")) { 
     return 4; 
    } 

     return 5; 

    } 

} 

答えて

0

であり、あなたの場合です年に基づいてデータを配信したい場合は、データ特性に基づいて、一部の減速機私は20Gbを持っていますが、一部は15Mbしか処理できません。

MultipleOutputFormatを使用し、6バケットのデータを配信するセネリオが同じ場合は、同じページに着陸します。

年と他の属性のような別の属性が見つかったり、減速器の数を増やしたり、属性のHashCodeに基づいてパーティショナーを定義する必要があります(この場合も、減速器で処理するデータが増えます) 。

あなただけの参照したい場合は、リンクの下で

http://hadooptutorial.info/mapreduce-multiple-outputs-use-case/

をMultipleOutput例を見つけることができます
関連する問題