2017-04-20 4 views
0

hadoopアーカイブ.harフォーマットで膨大なデータがあります。 harには圧縮が含まれていないので、さらにgzipしてHDFSに保存しようとしています。私が間違いなく働くことができるのは、Sparkを使用してHDFSでHarファイルをGzippingする

harFile.coalesce(1, "true") 
.saveAsTextFile("hdfs://namenode/archive/GzipOutput", classOf[org.apache.hadoop.io.compress.GzipCodec]) 
//`coalesce` because Gzip isn't splittable. 

ですが、これは私に正しい結果を与えません。 Gzippedファイルが生成されますが、出力が無効です(rddタイプなどを示す1行)

助けてください。私は他のアプローチにもオープンしています。

ありがとうございました。

+0

HARアーカイブにはどのような種類のコンテンツ(CSV、JSON、非構造化テキスト(ログなど)、バイナリ)がありますか?各HARのアーカイブを解除し、内部の各ファイルをGZIP形式にして再アーカイブすることを検討しましたか?バイナリではない場合は、各HAR(または複数のHAR)の内容をMRまたはSparkジョブで、単一のGZip形式(またはBZip形式)のファイルにマージすることを検討しましたか?構造化されている場合は、各HAR(または複数のHAR)の内容をParquet ou ORCやGZip圧縮などの列形式にマージすることを検討しましたか? –

+0

@SamsonScharfrichter harには、フラットテキストファイルまたは寄木細工ファイルが含まれます。 xmlsのようなものはありませんが、私はデータを分割したくありません。 harには350以上のディレクトリがあり、各ディレクトリにはファイルが存在するため、各ファイルをgzipすることは問題になります。私はそれをどうやって行うのか分からない。私はPIGを使ってGZip Compressionを使ってその単一のharファイルを圧縮しようとしました。それは圧縮に成功しましたが、GZipは分割可能ではないので、再び望ましくない部分ファイルを作成しました。最後に、各harを別々にgzipする必要があるため、複数のHARをマージすることはできません。 – philantrovert

答えて

1

既存のHDFSファイルの圧縮バージョンを作成するためのJavaコードスニペット。

テキストエディタで、私は少し前に書いたJavaアプリのビットとピースを使っていますので、テストされません。いくつかのタイプミスやギャップが予想される。

// HDFS API 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.security.UserGroupInformation; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.FileStatus; 
// native Hadoop compression libraries 
import org.apache.hadoop.io.compress.CompressionCodecFactory; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.Compressor; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.io.compress.BZip2Codec; 
import org.apache.hadoop.io.compress.SnappyCodec; 
import org.apache.hadoop.io.compress.Lz4Codec; 

.............. 

    // Hadoop "Configuration" (and its derivatives for HDFS, HBase etc.) constructors try to auto-magically 
    // find their config files by searching CLASSPATH for directories, and searching each dir for hard-coded 
    // name "core-site.xml", plus "hdfs-site.xml" and/or "hbase-site.xml" etc. 
    // WARNING - if these config files are not found, the "Configuration" reverts to hard-coded defaults without 
    // any warning, resulting in bizarre error messages later > let's run some explicit controls here 
    Configuration cnfHadoop = new Configuration() ; 
    String propDefaultFs =cnfHadoop.get("fs.defaultFS") ; 
    if (propDefaultFs ==null || ! propDefaultFs.startsWith("hdfs://")) 
    { throw new IllegalArgumentException(
       "HDFS configuration is missing - no proper \"core-site.xml\" found, please add\n" 
       +"directory /etc/hadoop/conf/ (or custom dir with custom XML conf files) in CLASSPATH" 
       ) ; 
    } 
/* 
    // for a Kerberised cluster, either you already have a valid TGT in the default 
    // ticket cache (via "kinit"), or you have to authenticate by code 
    UserGroupInformation.setConfiguration(cnfHadoop) ; 
    UserGroupInformation.loginUserFromKeytab("[email protected]", "/some/path/to/user.keytab") ; 
*/ 
    FileSystem fsCluster =FileSystem.get(cnfHadoop) ; 
    Path source = new Path("/some/hdfs/path/to/XXX.har") ; 
    Path target = new Path("/some/hdfs/path/to/XXX.har.gz") ; 

    // alternative: "BZip2Codec" for better compression (but higher CPU cost) 
    // alternative: "SnappyCodec" or "Lz4Codec" for lower compression (but much lower CPU cost) 
    CompressionCodecFactory codecBootstrap = new CompressionCodecFactory(cnfHadoop) ; 
    CompressionCodec codecHadoop =codecBootstrap.getCodecByClassName(GzipCodec.class.getName()) ; 
    Compressor compressorHadoop =codecHadoop.createCompressor() ; 

    byte[] buffer = new byte[16*1024*1024] ; 
    int bufUsedCapacity ; 
    InputStream sourceStream =fsCluster.open(source) ; 
    OutputStream targetStream =codecHadoop.createOutputStream(fsCluster.create(target, true), compressorHadoop) ; 
    while ((bufUsedCapacity =sourceStream.read(buffer)) >0) 
    { targetStream.write(buffer, 0, bufUsedCapacity) ; } 
    targetStream.close() ; 
    sourceStream.close() ; 

.............. 
+0

答えサムソンに感謝します。試して更新します。 – philantrovert

+0

だから私は試してみましたが、うまくいけば 'har'ファイルはディレクトリであり、ディレクトリを圧縮することはできません。 HDFSで 'har'(' org.apache.commons.compress'を使用)の代わりに 'tar'を作成してgzipすることを提案しますか? – philantrovert

+0

Duh ... HARが本当に変な獣だと思われる。しかし、HDFSがディレクトリであり、個々のファイルにアクセスできるようにするならば、HARから標準の 'java.util.zip.ZipOutputStream'と'。putNextEntry() 'を使って一つのZIPファイルを作ることができるはずです。等) - _免責事項:私は尊敬できるTAR形式の大ファンではない。 –

関連する問題