2013-07-26 4 views
9

私は巨大な番号を持っていません。小さいファイルの場合、私はCombineFileInputFormatを使用して、各ファイルデータが自分のMRジョブの単一レコードとして来るようにファイルをマージします。 私は2つの問題直面していますhttp://yaseminavcular.blogspot.in/2011/03/many-small-input-files.htmlを踏襲し、新しいAPIhadoopで単一のレコードとしてファイルを読む

に変換しようとしています:

をA)私はまだ2つのマッパーが発射され、2つの小さなファイルでそれをテストしています。私は1を期待した

b)各行は単一のレコードとして来て、私は単一のレコードとしてファイル全体が欲しい。

痛いかもしれませんが、以下のコードをご覧ください。私はまだHadoopの中にナイーブだ

ドライバクラス

public class MRDriver extends Configured implements Tool { 


@Override 
public int run(String[] args) throws Exception { 
    FileSystem fs = new Path(".").getFileSystem(getConf()); 
    fs.printStatistics(); 
    Job job = new Job(getConf()); 
    job.setJobName("Enron MR"); 
    job.setMapperClass(EnronMailReadMapper.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    job.setNumReduceTasks(0); 
    job.setJarByClass(EnronMailReadMapper.class); 
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0])); 
    job.setOutputFormatClass(TextOutputFormat.class); 
    TextOutputFormat.setOutputPath(job, new Path(args[1])); 
    return job.waitForCompletion(true) ? 0 :1; 
} 

public static void main(String[] args) throws Exception { 
    int exitCode = ToolRunner.run(new MRDriver(), args); 
    System.exit(exitCode); 
} 

}

以下のクラスは、主に初期化するために変更してLineRecordReaderのペーストをコピーしている()& nextKeyValue()関数

public class SingleFileRecordReader extends RecordReader<LongWritable, Text> { 
    private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class); 

    private long start; 
    private long pos; 
    private long end; 
    private LineReader in; 
    private int maxLineLength; 
    private LongWritable key = null; 
    private Text value = null; 

    public void initialize(InputSplit genericSplit, 
         TaskAttemptContext context) throws IOException { 
    FileSplit split = (FileSplit) genericSplit; 
    Configuration job = context.getConfiguration(); 
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 
            Integer.MAX_VALUE); 
    start = split.getStart(); 
    end = start + split.getLength(); 
    final Path file = split.getPath(); 

    // open the file and seek to the start of the split 
    FileSystem fs = file.getFileSystem(job); 
    FSDataInputStream fileIn = fs.open(split.getPath()); 

     fileIn.seek(start); 
     in = new LineReader(fileIn, job); 
    // If this is not the first split, we always throw away first record 
    // because we always (except the last split) read one extra line in 
    // next() method. 
    if (start != 0) { 
     start += in.readLine(new Text(), 0, maxBytesToConsume(start)); 
    } 
    this.pos = start; 
    } 

    private int maxBytesToConsume(long pos) { 
    return (int) Math.min(Integer.MAX_VALUE, end - pos); 
    } 

    private long getFilePosition() throws IOException { 
    long retVal= pos; 
    return retVal; 
    } 

    public boolean nextKeyValue() throws IOException { 
    if (key == null) { 
     key = new LongWritable(); 
    } 
    key.set(pos); 
    if (value == null) { 
     value = new Text(); 
    } 
    int newSize = 0; 
    StringBuffer totalValue = new StringBuffer(); 
    // We always read one extra line, which lies outside the upper 
    // split limit i.e. (end - 1) 
    while (getFilePosition() <= end) { 
     newSize = in.readLine(value, maxLineLength, 
      Math.max(maxBytesToConsume(pos), maxLineLength)); 
     if (newSize == 0) { 
     break; 
     } 
     totalValue.append(value.toString()+"\n"); 
     pos += newSize; 
     if (newSize < maxLineLength) { 
     break; 
     } 

     // line too long. try again 
     LOG.info("Skipped line of size " + newSize + " at pos " + 
       (pos - newSize)); 
    } 
    if (newSize == 0) { 
     key = null; 
     value = null; 
     return false; 
    } else { 
     value = new Text(totalValue.toString()); 
     return true; 
    } 
    } 

    @Override 
    public LongWritable getCurrentKey() { 
    return key; 
    } 

    @Override 
    public Text getCurrentValue() { 
    return value; 
    } 

    /** 
    * Get the progress within the split 
    */ 
    public float getProgress() throws IOException { 
    if (start == end) { 
     return 0.0f; 
    } else { 
     return Math.min(1.0f, 
     (getFilePosition() - start)/(float)(end - start)); 
    } 
    } 

    public synchronized void close() throws IOException { 
    try { 
     if (in != null) { 
     in.close(); 
     } 
    } finally { 

    } 
    } 

}

Ot彼女のファイル

public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{ 

@Override 
public RecordReader<LongWritable, Text> createRecordReader(
     InputSplit split, TaskAttemptContext context) throws IOException { 
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class); 
} 

}

そして

public class MultiFileRecordReader extends RecordReader < LongWritable, Text > { 

private CombineFileSplit split; 
private TaskAttemptContext context; 
private int index; 
private RecordReader< LongWritable, Text > rr; 

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) { 
    this.split = split; 
    this.context = context; 
    this.index = index; 
    this.rr = new SingleFileRecordReader(); 
} 
@Override 
public void initialize(InputSplit split, TaskAttemptContext context) 
     throws IOException, InterruptedException { 
    this.split = (CombineFileSplit) split; 
     this.context = context; 

     if (null == rr) { 
     rr = new SingleFileRecordReader(); 
     } 

     FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
              this.split.getOffset(index), 
              this.split.getLength(index), 
              this.split.getLocations()); 
     this.rr.initialize(fileSplit, this.context); 

} 

@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.nextKeyValue(); 
} 

@Override 
public LongWritable getCurrentKey() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getCurrentKey(); 
} 

@Override 
public Text getCurrentValue() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getCurrentValue(); 
} 

@Override 
public float getProgress() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getProgress(); 
} 

@Override 
public void close() throws IOException { 
    if (rr != null) { 
      rr.close(); 
      rr = null; 
    }  
} 

}

+0

、ファイルが結合されます。あなたは 'CombineFileInputFormat'のドキュメントでそれを読むことができます。 –

答えて

10

は、この入力format.Thisに見てみましょう1つのマップタスクで複数のファイルを読み込むための入力フォーマットです。正確に1つの(分割されていない)ファイルは、マッパーに渡される各レコードによって読み取られます。 WholeFileRecordReaderは1つのファイルコンテンツを1つの値として送信します。返されるキーはNullWritableで、valueは各ファイルの内容全体です。これを使用してmapreduceジョブを実行し、実際に実行されているマッパーの数を確認し、得られた出力が正しいかどうかを確認することができます。

レコードはWholeFileRecordReadersから作成されます。ここでは上記の

public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{ 

     @Override 
     protected boolean isSplitable(JobContext context, Path file) { 
      return false; 
     } 

/** 
    * Creates a CombineFileRecordReader to read each file assigned to this InputSplit. 
    * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore 
    * is expected to specify multiple files. 
    * 
    * @param split The InputSplit to read. Throws an IllegalArgumentException if this is 
    *  not a CombineFileSplit. 
    * @param context The context for this task. 
    * @return a CombineFileRecordReader to process each file in split. 
    *   It will read each file with a WholeFileRecordReader. 
    * @throws IOException if there is an error. 
    */ 

    @Override 
    public RecordReader<NullWritable, Text> createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException { 

     if (!(split instanceof CombineFileSplit)) { 
       throw new IllegalArgumentException("split must be a CombineFileSplit"); 
      } 
      return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class); 
    } 

    } 

は次のようであるWholeFileRecordReaderを使用していることができます: -

public class WholeFileRecordReader extends RecordReader<NullWritable, Text> { 
    private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class); 

     /** The path to the file to read. */ 
     private final Path mFileToRead; 
     /** The length of this file. */ 
     private final long mFileLength; 

     /** The Configuration. */ 
     private final Configuration mConf; 

     /** Whether this FileSplit has been processed. */ 
     private boolean mProcessed; 
     /** Single Text to store the file name of the current file. */ 
    // private final Text mFileName; 
     /** Single Text to store the value of this file (the value) when it is read. */ 
     private final Text mFileText; 

     /** 
     * Implementation detail: This constructor is built to be called via 
     * reflection from within CombineFileRecordReader. 
     * 
     * @param fileSplit The CombineFileSplit that this will read from. 
     * @param context The context for this task. 
     * @param pathToProcess The path index from the CombineFileSplit to process in this record. 
     */ 
     public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, 
      Integer pathToProcess) { 
     mProcessed = false; 
     mFileToRead = fileSplit.getPath(pathToProcess); 
     mFileLength = fileSplit.getLength(pathToProcess); 
     mConf = context.getConfiguration(); 

     assert 0 == fileSplit.getOffset(pathToProcess); 
     if (LOG.isDebugEnabled()) { 
      LOG.debug("FileToRead is: " + mFileToRead.toString()); 
      LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths()); 

      try { 
      FileSystem fs = FileSystem.get(mConf); 
      assert fs.getFileStatus(mFileToRead).getLen() == mFileLength; 
      } catch (IOException ioe) { 
      // oh well, I was just testing. 
      } 
     } 

    // mFileName = new Text(); 
     mFileText = new Text(); 
     } 

     /** {@inheritDoc} */ 
     @Override 
     public void close() throws IOException { 
     mFileText.clear(); 
     } 

     /** 
     * Returns the absolute path to the current file. 
     * 
     * @return The absolute path to the current file. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public NullWritable getCurrentKey() throws IOException, InterruptedException { 
     return NullWritable.get(); 
     } 

     /** 
     * <p>Returns the current value. If the file has been read with a call to NextKeyValue(), 
     * this returns the contents of the file as a BytesWritable. Otherwise, it returns an 
     * empty BytesWritable.</p> 
     * 
     * <p>Throws an IllegalStateException if initialize() is not called first.</p> 
     * 
     * @return A BytesWritable containing the contents of the file to read. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
     return mFileText; 
     } 

     /** 
     * Returns whether the file has been processed or not. Since only one record 
     * will be generated for a file, progress will be 0.0 if it has not been processed, 
     * and 1.0 if it has. 
     * 
     * @return 0.0 if the file has not been processed. 1.0 if it has. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public float getProgress() throws IOException, InterruptedException { 
     return (mProcessed) ? (float) 1.0 : (float) 0.0; 
     } 

     /** 
     * All of the internal state is already set on instantiation. This is a no-op. 
     * 
     * @param split The InputSplit to read. Unused. 
     * @param context The context for this task. Unused. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) 
      throws IOException, InterruptedException { 
     // no-op. 
     } 

     /** 
     * <p>If the file has not already been read, this reads it into memory, so that a call 
     * to getCurrentValue() will return the entire contents of this file as Text, 
     * and getCurrentKey() will return the qualified path to this file as Text. Then, returns 
     * true. If it has already been read, then returns false without updating any internal state.</p> 
     * 
     * @return Whether the file was read or not. 
     * @throws IOException if there is an error reading the file. 
     * @throws InterruptedException if there is an error. 
     */ 
     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
     if (!mProcessed) { 
      if (mFileLength > (long) Integer.MAX_VALUE) { 
      throw new IOException("File is longer than Integer.MAX_VALUE."); 
      } 
      byte[] contents = new byte[(int) mFileLength]; 

      FileSystem fs = mFileToRead.getFileSystem(mConf); 
      FSDataInputStream in = null; 
      try { 
      // Set the contents of this file. 
      in = fs.open(mFileToRead); 
      IOUtils.readFully(in, contents, 0, contents.length); 
      mFileText.set(contents, 0, contents.length); 

      } finally { 
      IOUtils.closeStream(in); 
      } 
      mProcessed = true; 
      return true; 
     } 
     return false; 
     } 

} 

され、次のあなたのドライバのコードを: -

あなたは最大分割サイズを設定する必要が
public int run(String[] arg) throws Exception { 
    Configuration conf=getConf(); 
    FileSystem fs = FileSystem.get(conf); 
    //estimate reducers 
    Job job = new Job(conf); 
    job.setJarByClass(WholeFileDriver.class); 
    job.setJobName("WholeFile"); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    job.setInputFormatClass(WholeFileInputFormat.class); 
    job.setMapperClass(WholeFileMapper.class); 
    job.setNumReduceTasks(0); 

    FileInputFormat.addInputPath(job, new Path(arg[0])); 
    Path output=new Path(arg[1]); 
    try { 
     fs.delete(output, true); 
    } catch (IOException e) { 
     LOG.warn("Failed to delete temporary path", e); 
    } 
    FileOutputFormat.setOutputPath(job, output); 

    boolean ret=job.waitForCompletion(true); 
    if(!ret){ 
     throw new Exception("Job Failed"); 
    } 
+0

ありがとうございました。バイナリ...私はjob.setInputFormatClass(WholeFileInputFormat.class)を追加していませんでした。 私はRawCombineFileInputFormat.addInputPath(job、new Path(args [0]))を追加したときでも、 なぜフォーマットの種類が選択されなかったのですか?すべてのans? –

関連する問題