2016-10-03 21 views
0

S3バケットとHDFSから複数のディレクトリ(つまり複数のパス)を読み込む必要があるSparkアプリケーションで作業しています。 newHadoopAPIは、Lzoの圧縮/索引付けされたファイルを優れた方法で読み込むための優れた方法を提供しています。しかし、newHadoopAPIを使用して複数のフォルダパス/ディレクトリにRDDのいくつかのLzoファイルとIndexファイルをどのように読み込むのですか?newhadoopAPIの複数の入力パスでLZOファイルを読み込むためのファイル

フォルダ構造は、2つの列に分割ハイブテーブルのようなものです。 例:以下のとおりです。日付とバッチのパーティション

/rootDirectory/date=20161002/batch=5678/001_0.lzo /rootDirectory/date=20161002/batch=5678/001_0.lzo.index /RootDirectoryは/日付= 20161002 /バッチ= 56​​78/002_0.lzo /rootDirectory/date=20161002/batch=5678/002_0.lzo.index /rootDirectory/date=20161002/batch=8765/001_0.lzo /RootDirectoryは/日付= 20161002 /バッチ= 8765 /001_0.lzo.index /rootDirectory/date=20161002/batch=8765/002_0.lzo /rootDirectory/date=20161002/batch=8765/002_0.lzo.index

.....など。

私はS3からデータを読み取るのに以下のコードを使用します。これは、LzoとLzo.Indexの両方のファイルを入力として処理します。私のアプリケーションはクラッシュしますが、.lzo.indexファイルは読み込みたくありませんが、速度の指標を使用する.lzoファイルだけです。

val impInput = sparkSession.sparkContext.newAPIHadoopFile("s3://my-bucket/myfolder/*/*", classOf[NonSplittableTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]) 
    val impRDD = impInput.map(_._2.toString) 

どうすればいいですか?

1)。私の利益のために.indexファイルを利用できるように、newHadoopAPIを使ってLzoファイルのルートの下にあるすべての(複数の)フォルダを読んでください。

2)。同様の方法でHDFSからデータを読み込みます。

+0

このhttps://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%[email protected].com%3E –

+0

感謝を試してみてください@AyanGuha - しかし、これは動作していないようです。この例では、78 GB(.lzo)の大きな肉のファイルがあり、インデックスを使用してそれを読み取るために使用しています。 私の場合、いくつかの小さなファイルがあり、小さなファイルが多数あるためにLzoファイルを使用する利点がありません。階層は上記のとおりです。 これに関するご意見はありますか? –

答えて

0

HDFSパスに接尾辞を追加すると役立ちます。

val impInput = sparkSession.sparkContext.newAPIHadoopFile("s3://my-bucket/myfolder/*/*.lzo", classOf[NonSplittableTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]) 
関連する問題