2017-04-18 6 views
0

ストームトポロジに入ってくるすべての生データをHDFSクラスタに保存したい。 JSONまたはバイナリデータで、2k /秒の速度で着信します。ストーム - 圧縮を使用してHDFSに書き込む

Iとして、私はHDFSボルト(http://storm.apache.org/releases/0.10.0/storm-hdfs.htmlàを使用しようとしていたが、それは通常のHDFSを使用して圧縮を許可していないボルト 圧縮はシーケンスファイルボルトを使用してのみ可能です。 私はシーケンス・ファイルを使用したくありません私は私のキー/値のものを格納し、私の要求にサービスを提供するために、すでにカサンドラを持っている、本物の鍵を持っている。

プラスしません。 それはちょうど私の生データのためのカサンドラを使用して、あまりにも多くのディスク(オーバーヘッド)を取る(ないこの記事をこれについて議論する目的)。

誰でも私にそれを助けることができますか? 私はjavaのHadoopドライバクライアントはそれを達成するために? 誰かにそのコードスニペットを持っていますか?

+0

ないにしてくださいあなたたち からのフィードバックを持っています//DATADIRですでに圧縮されたファイルを削除する必要が

希望を終えましたファイルはありますが鍵の欠如があなたを止めるべきではありません。 NullWritableをキーとして使用できます。 – Venkat

答えて

0

いいえ、私が望んでいたように、飛行機で圧縮する方法はありません。 しかし、私は解決策を見つけました。もし誰かがそれを必要とするなら、私はここでそれを共有します。

この問題はStormに関連するだけでなく、より一般的なHadoopの質問です。

すべての私のデータはHdfsBoltを使用してwrittedされています

RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); 

    //Synchronize data buffer with the filesystem every 1000 tuples 
    // Need to be configurable 
    SyncPolicy syncPolicy = new CountSyncPolicy(1000); 

    // Rotate data files when they reach five MB 
    // need to be configuration 
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(10.0f, FileSizeRotationPolicy.Units.MB); 

    // Use default, Storm-generated file names 
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/datadir/in_progress") ; 

    // Instantiate the HdfsBolt 
    HdfsBolt bolt = new HdfsBolt() 
     .withFsUrl("hdfs://"+dfsHost+":"+dfsPort) 
     .withFileNameFormat(fileNameFormat) 
     .withRecordFormat(format) 
     .withRotationPolicy(rotationPolicy) 
     .withSyncPolicy(syncPolicy) 
     .addRotationAction(new MoveFileAction().withDestination("/datadir/finished")); 

これは、取り扱いが..容易ではない私に私のボルトの執行につき一つのファイルを与えているが、それは大丈夫です:)

それから私は、自動スケジュール私はまだ1つの問題がある

ここ
hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \ 
     -Dmapred.reduce.tasks=0 \ 
     -Dmapred.output.compress=true \ 
     -Dmapred.compress.map.output=true \ 
     -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \ 
     -input /datadir/finished \ 
     -output /datadir/archives \ 
     -mapper /bin/cat \ 
     -inputformat org.apache.hadoop.mapred.TextInputFormat \ 
     -outputformat org.apache.hadoop.mapred.TextOutputFormat 

:(このような名前ノードか何か上のcronで)のHadoopストリーミングを使用して圧縮が 一方の入力ファイルがあります1つのアーカイブに圧縮します。 10MBの入力ファイル(1人の作業者ごと)が1MBのgzip(またはbzip)に圧縮しています - >これは非常に多くの小さなファイルを生成しており、ハープープの問題です

この問題を解決するには、 hadoopアーカイブ(HAR)の機能性を見てみてください。

はまた、私はタッチで私はシーケンスをお勧めしますことを

よろしく、 バスティアン

関連する問題