0
HDFS上にいくつかのtarファイルがあります。私の目的は、これらのファイルを抽出することです& HDFSに抽出されたファイルが格納されます。例のため例外の取得:IOException入力バッファがtarファイルの抽出中に閉じられた例外です
:
これは私の入力ディレクトリ構造である(HDFS)。
--------
| Output |
--------
|
|----- xyz.gz
|----- xyz2.gz
私のコードは、これらのtarファイルを抽出し、HDFS上のパスにそれらのファイルを格納します。
Path : /data/160823 -->
--------
| 160823 |
--------
|
| --- 00
|----- xyz.tar
|----- xyz2.tar
| --- 01
|----- xyz3.tar
|----- abc2.tar
| --- 02
|----- abc3.tar
|----- abc4.tar
.
.
.
--- 23
|----- pqr.tar
|----- pqr2.tar
の予想される出力は次のようになります。
私は最初の.tarファイル&をHDFSに保存することもできますが、その後、次の.tarファイルを読み込んでいる間にこの例外が発生します。
java.io.IOException: input buffer is closed
at org.apache.commons.compress.archivers.tar.TarBuffer.readRecord(TarBuffer.java:190)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getRecord(TarArchiveInputStream.java:302)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:230)
at com.lsr.TarMapper.call(TarMapper.java:53)
at com.lsr.TarMapper.call(TarMapper.java:1)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
この私のコードスニペット、
import java.util.ArrayList;
import java.util.List;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import com.utils.FileWrapper;
public class TarMapper implements FlatMapFunction<String, String>{
public Iterable<String> call(String arg0) throws Exception {
System.out.println("Arg0 : "+arg0);
List<String> untarFile = new ArrayList<String>();
FileSystem fileSystem = LTar.fs;
FSDataInputStream fsin = null;
TarArchiveInputStream tarin = null;
OutputStream outstr = null;
TarArchiveEntry tarentry = null;
FSDataOutputStream fsDataOutputStream = null;
Path outputPath = null;
try{
fileSystem = FileSystem.get(LTar.conf);
fsin = fileSystem.open(new Path(arg0));
tarin = new TarArchiveInputStream(fsin);
tarentry = tarin.getNextTarEntry();
while (tarentry != null) {
if (!tarentry.isDirectory()) {
System.out.println("TAR ENTRY : "+tarentry);
outputPath = new Path("/data/tar/"+tarentry.getName().substring(2));
fsDataOutputStream = fileSystem.create(outputPath);
System.out.println("Name : "+tarentry.getName()+"Other : ");
IOUtils.copyBytes(tarin, fsDataOutputStream, LTar.conf);
}
tarentry = tarin.getNextTarEntry();
}
}catch (Exception e) {
e.printStackTrace();
} finally {
if (tarin != null) {
tarin.close();
}
if (fsin != null) {
fsin.close();
}
if (fileSystem != null) {
fileSystem.close();
}
if(outstr !=null){
outstr.close();
}
if(fsDataOutputStream != null){
fsDataOutputStream.close();
}
}
return untarFile;
}
}
は、この問題に関するあなたの提案を入力してください。
これは機能します。ありがとう@EJP、私はIOUtilsパッケージのドキュメントに、ブール引数を持つ別のcopyBytes()関数が見つかりました。これですべての.tarファイルを抽出できます。ここにその構文があります。 copyBytes(InputStream in、OutputStream out、コンフィギュレーションconf、boolean close) –