2016-08-23 10 views
0

HDFS上にいくつかのtarファイルがあります。私の目的は、これらのファイルを抽出することです& HDFSに抽出されたファイルが格納されます。例のため例外の取得:IOException入力バッファがtarファイルの抽出中に閉じられた例外です

これは私の入力ディレクトリ構造である(HDFS)。

-------- 
| Output | 
-------- 
    | 
    |----- xyz.gz 
    |----- xyz2.gz 

私のコードは、これらのtarファイルを抽出し、HDFS上のパスにそれらのファイルを格納します。

Path : /data/160823 --> 
-------- 
| 160823 | 
-------- 
    | 
    | --- 00 
     |----- xyz.tar 
     |----- xyz2.tar 

    | --- 01 
     |----- xyz3.tar 
     |----- abc2.tar 

    | --- 02 
     |----- abc3.tar 
     |----- abc4.tar 

    . 
    . 
    . 
    --- 23 
     |----- pqr.tar 
     |----- pqr2.tar 

の予想される出力は次のようになります。

私は最初の.tarファイル&をHDFSに保存することもできますが、その後、次の.tarファイルを読み込んでいる間にこの例外が発生します。

java.io.IOException: input buffer is closed 
    at org.apache.commons.compress.archivers.tar.TarBuffer.readRecord(TarBuffer.java:190) 
    at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getRecord(TarArchiveInputStream.java:302) 
    at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:230) 
    at com.lsr.TarMapper.call(TarMapper.java:53) 
    at com.lsr.TarMapper.call(TarMapper.java:1) 
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) 
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

この私のコードスニペット、

import java.util.ArrayList; 
import java.util.List; 
import java.io.File; 
import java.io.FileOutputStream; 
import java.io.OutputStream; 
import java.net.URI; 
import org.apache.commons.compress.archivers.tar.TarArchiveEntry; 
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import com.utils.FileWrapper; 

public class TarMapper implements FlatMapFunction<String, String>{ 

    public Iterable<String> call(String arg0) throws Exception { 
     System.out.println("Arg0 : "+arg0); 
     List<String> untarFile = new ArrayList<String>(); 
     FileSystem fileSystem = LTar.fs; 
     FSDataInputStream fsin = null; 
     TarArchiveInputStream tarin = null; 
     OutputStream outstr = null; 
     TarArchiveEntry tarentry = null; 
     FSDataOutputStream fsDataOutputStream = null; 
     Path outputPath = null; 
     try{ 
      fileSystem = FileSystem.get(LTar.conf); 
      fsin = fileSystem.open(new Path(arg0)); 
      tarin = new TarArchiveInputStream(fsin); 
      tarentry = tarin.getNextTarEntry(); 
      while (tarentry != null) { 
       if (!tarentry.isDirectory()) { 
        System.out.println("TAR ENTRY : "+tarentry); 
        outputPath = new Path("/data/tar/"+tarentry.getName().substring(2)); 
        fsDataOutputStream = fileSystem.create(outputPath); 
        System.out.println("Name : "+tarentry.getName()+"Other : "); 
        IOUtils.copyBytes(tarin, fsDataOutputStream, LTar.conf); 
       } 
       tarentry = tarin.getNextTarEntry(); 
      } 
     }catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      if (tarin != null) { 
       tarin.close(); 
      } 
      if (fsin != null) { 
       fsin.close(); 
      } 
      if (fileSystem != null) { 
       fileSystem.close(); 
      } 
      if(outstr !=null){ 
       outstr.close(); 
      } 
      if(fsDataOutputStream != null){ 
       fsDataOutputStream.close(); 
      } 
     } 
     return untarFile; 
    } 
} 

は、この問題に関するあなたの提案を入力してください。

答えて

1

あなたが呼び出しているcopyBytes()のオーバーロードは、コピーの最後に入力ストリームを閉じます。

もう1つを使用してください。

+0

これは機能します。ありがとう@EJP、私はIOUtilsパッケージのドキュメントに、ブール引数を持つ別のcopyBytes()関数が見つかりました。これですべての.tarファイルを抽出できます。ここにその構文があります。 copyBytes(InputStream in、OutputStream out、コンフィギュレーションconf、boolean close) –

関連する問題