2016-04-13 17 views
7

私は以下の問題があります。HDFSに格納された複数のファイルを含む圧縮ディレクトリを含むディレクトリがあるとします。私はつまり、型Tの一部のオブジェクトからなるRDDを作成したい:Sparkの圧縮ファイルから全文ファイルを読む

context = new JavaSparkContext(conf); 
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath); 

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath); 
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> { 
    // The name of the file 
    String fileName = fileNameContent._1(); 
    // The content of the file 
    String content = fileNameContent._2(); 

    // Class T has a constructor of taking the filename and the content of each 
    // processed file (as two strings) 
    T t = new T(content, fileName); 

    return t; 
}); 

inputDataPathが、これは完全に正常に動作したファイルを含むディレクトリが、あるとき、すなわち、それはだときのようなもの:

String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders 

しかし、複数のファイルを含むtgzがある場合、ファイルの内容(fileNameContent._2())は私に役に立たないバイナリ文字列(かなり期待されます)を取得します。私はsimilar question on SOを見つけましたが、解決方法はそれぞれの圧縮が1つのファイルのみで構成されているためです。私の場合は、ファイル全体として個別に読みたい多くのファイルがあります。私もquestionwholeTextFilesを見つけましたが、これは私のケースでは機能しません。

どのようにすればいいですか?

EDIT:

私は(機能testTarballWithFolders()のように、hereから読者をテストしようとしている)hereからリーダーを試してみましたが、私は

TarballReader tarballReader = new TarballReader(fileName); 

を呼び出すと、私はNullPointerExceptionを取得するたびに:

java.lang.NullPointerException 
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83) 
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77) 
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) 
    at utils.TarballReader.<init>(TarballReader.java:61) 
    at main.SparkMain.lambda$0(SparkMain.java:105) 
    at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source) 
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

0125358の行は、私は私のポストの編集上部を示したものであり、TarballReaderからのライン61は、上部ラインに入力ストリームinにnull値与える

GZIPInputStream gzip = new GZIPInputStream(in); 

ある:

InputStream in = this.getClass().getResourceAsStream(tarball); 

午前私は正しい道にここにいる?もしそうなら、どうすればいいですか?なぜこのヌル値が得られ、どうすれば修正できますか?

+0

は、Uをしました参照してください:http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles –

+0

はい、私の質問に書いた - それは私の場合は動作しません。 'パス'では、私は圧縮のパスをもう一度得ます。これは役に立たないです。 – Belphegor

+0

これは "ヌルポインタの例外とは何ですか?それを避けるにはどうすればいいですか"の複製です –

答えて

17

可能な解決策の1つは、binaryFilesでデータを読み取り、内容を手動で抽出することです。

スカラ:ジャワと

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream 
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream 
import org.apache.spark.input.PortableDataStream 
import scala.util.Try 
import java.nio.charset._ 

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try { 
    val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open)) 
    Stream.continually(Option(tar.getNextTarEntry)) 
    // Read until next exntry is null 
    .takeWhile(_.isDefined) 
    // flatten 
    .flatMap(x => x) 
    // Drop directories 
    .filter(!_.isDirectory) 
    .map(e => { 
     Stream.continually { 
     // Read n bytes 
     val buffer = Array.fill[Byte](n)(-1) 
     val i = tar.read(buffer, 0, n) 
     (i, buffer.take(i))} 
     // Take as long as we've read something 
     .takeWhile(_._1 > 0) 
     .map(_._2) 
     .flatten 
     .toArray}) 
    .toArray 
} 

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = 
    new String(bytes, StandardCharsets.UTF_8) 

sc.binaryFiles("somePath").flatMapValues(x => 
    extractFiles(x).toOption).mapValues(_.map(decode())) 
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11" 

完全な使用例:https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Pythonの

import tarfile 
from io import BytesIO 

def extractFiles(bytes): 
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz") 
    return [tar.extractfile(x).read() for x in tar if x.isfile()] 

(sc.binaryFiles("somePath") 
    .mapValues(extractFiles) 
    .mapValues(lambda xs: [x.decode("utf-8") for x in xs])) 
+0

答えに感謝しますが、私はJavaで作業します(タグ+コードを参照)。あなたはJavaで答えを提供してください。それは、私はそれをテストし、それが動作するかどうかを確認することはできません(私のコードの残りはJavaにあります)。 – Belphegor

+0

@Belphegorこれは既にJava互換のコードです。 https://bitbucket.org/zero323/spark-multifile-targz-extract – zero323

+0

Arghhh ...私は 'binaryFiles'について何も知らなかった(あなたの答えからそれを見逃した)。とにかく、この方向はトリックです、私は元のコードにいくつかの調整を加えて、それは正常に動作します。ありがとう! – Belphegor

関連する問題