2017-02-03 8 views
2

私は多くの異なるフォルダを持ち、各フォルダの中にはHadoopファイル(part_001など)があります。ディレクトリを考える Sparkを使ってディレクトリからHadoopファイルを再帰的に読み込むには?

directory 
    -> folder1 
     -> part_001... 
     -> part_002... 
    -> folder2 
     -> part_001... 
    ... 

、どのように私は再帰的にこのディレクトリ内のすべてのフォルダの内容を読み、Scalaのを使用してスパークに単一RDDにこのコンテンツをロードすることができますか?

私はこれを見つけたが、それは再帰的に(私はimport org.apache.hadoop.mapreduce.lib.inputを使用しています)サブフォルダ内に進入していません。

var job: Job = null 
    try { 
    job = Job.getInstance() 
    FileInputFormat.setInputPaths(job, new Path("s3n://" + bucketNameData + "/" + directoryS3)) 
    FileInputFormat.setInputDirRecursive(job, true) 
    } catch { 
    case ioe: IOException => ioe.printStackTrace(); System.exit(1); 
    } 
    val sourceData = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).values 

私もSequenceFileを使用していますが、どのように再び私は理解していないこと、このweb-pageを見つけました私の場合にそれを適用する?あなたはスパークを使用している場合

+0

あなたは、単純なワイルドカードで試してみましたか?ディレクトリ構造が一貫していれば、それは魅力のように機能するはずです – Chobeat

+0

http://stackoverflow.com/a/27843858/647053を参照してください。 –

+0

@Chobeat:dbustosp( 'var rdd = sc.textFile("パス/ */* ")')は、私が投稿したコードをすべて記述する必要なしに、私が説明したことを直接行いますか? – user7379562

答えて

5

、あなたは次のように使用してこれwilcardsを行うことができます。

scala>sc.textFile("path/*/*") 

SCあなたが火花シェルを使用している場合SparkContextは、デフォルトでは初期化されているか、あなたが作成している場合は、あなたの独自のプログラムは自分でSparkContextをインスタンス化する必要があります。

次のフラグには注意してください:

スカーラ> sc.hadoopConfiguration.get( "mapreduce.input.fileinputformat.input.dir.recursive") RES6:文字列= nullの

ヨはこのフラグをtrueに設定する必要があり:

sc.hadoopConfiguration.set( "mapreduce.input.fileinputformat.input.dir.recursive"、 "真")

+0

だから、私はこれを単にやることができますか? 'val myRDD = sc.textFile(" path/*/* ")'? 'setInputDirRecursive'を使う必要はありませんか?私はストリングのRDDを手に入れますか? (私はストリングのRDDが必要です) – user7379562

+0

はい、まさに正しいです。これはデフォルトでStringとしてロードされ、ワイルドカードを使用する場合はそのフラグは使用されません。 – dbustosp

+0

私が誤解しているもう1つのこと:HadoopファイルのデータにJSON形式のデータがある場合、 'sc.textFile(...)'を実行した後にJSON文字列のRDDを取得します。そして、それをDataFrameに変換するには、このアプローチが有効でしょうか?:val rddFromHadoop = sc.textFile( "path/*/*")import sqlContext.implicits._ var df = rddFromHadoop.toDF() 'または 'toDF()'を適用する前に 'rddFromHadoop'を' RDD [Map [String、String]] 'に解析する必要がありますか?申し訳ありませんが、この追加の質問です。私が 'textFile'を使用してハイドロフィッシュファイルをRDDに読み込むと、私の全体的なアプローチがうまくいくことを理解することが重要です。 – user7379562

1

私は、パラメータは、このように設定しなければならないことを見出した:

.set("spark.hive.mapred.supports.subdirectories","true") 
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true") 
関連する問題