2015-10-23 15 views
5

私はSparkが新しく、各入力行にファイル名の列を挿入しようとしています。Sparkの各行にソースファイル名を追加するには?

他の人も同様の質問をしてきましたが、回答はすべてwholeTextFileでしたが、私は大きなCSVファイル(Spark-CSVライブラリを使用して読み込み)、JSONファイル、およびパーケットファイル小さなテキストファイルだけでなく)。

私は、ファイル名のリストを取得するためにspark-shellを使用することができます。

val df = sqlContext.read.parquet("/blah/dir") 
val names = df.select(inputFileName()) 
names.show 

をそれはデータフレームです。 私はそれを各行に列として追加する方法がわかりません(そして、その結果が最初のデータと同じ順序になっていれば、それはいつもそうだと思いますが)。そして、これをすべての入力タイプ。

+0

? –

+1

各レコードは元々どのファイルであるかを表示する必要があります。不正な入力ファイルのように、パス全体を知っているときにデバッグするのが簡単です – mcmcmc

答えて

2

テキストファイルからRDDを作成する場合、あなたはおそらく、あなたがその段階で入力ソースを追加することができますので、ケースクラスにデータをマッピングする:

case class Person(inputPath: String, name: String, age: Int) 
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt" 
val rdd = sc.textFile(inputPath).map { 
    l => 
     val tokens = l.split(",") 
     Person(inputPath, tokens(0), tokens(1).trim().toInt) 
    } 
rdd.collect().foreach(println) 

あなたがしたくない場合

case class InputSourceMetaData(path: String, size: Long) 
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData) 

// Fake the size, for demo purposes only 
val md = InputSourceMetaData(inputPath, size = -1L) 
val rdd = sc.textFile(inputPath).map { 
    l => 
    val tokens = l.split(",") 
    PersonWithMd(tokens(0), tokens(1).trim().toInt, md) 
} 
rdd.collect().foreach(println) 

、あなたはデータフレームにRDDを推進している場合:メタデータと "業務データを" ミックス

import sqlContext.implicits._ 
val df = rdd.toDF() 
df.registerTempTable("x") 

あなたは

あなたは再帰的にorg.apache.hadoop.fs.FileSystem.listFiles()を使用してHDFS内のファイルを読み込むことができ

sqlContext.sql("select name, metadata from x").show() 
sqlContext.sql("select name, metadata.path from x").show() 
sqlContext.sql("select name, metadata.path, metadata.size from x").show() 

アップデートのようにそれを照会することができます。

files内のファイル名のリスト(org.apache.hadoop.fs.LocatedFileStatusを含む標準のScalaのコレクション)を考えると、あなたはファイルごとに1 RDDを作成することができますが:

val rdds = files.map { f => 
    val md = InputSourceMetaData(f.getPath.toString, f.getLen) 

    sc.textFile(md.path).map { 
    l => 
     val tokens = l.split(",") 
     PersonWithMd(tokens(0), tokens(1).trim().toInt, md) 
    } 
} 

今、あなたは、単一の一つにRDDSのリストをreduceすることができます:単一のものにreduce concatsすべてRDDS機能:

val rdd = rdds.reduce(_ ++ _) 
rdd.collect().foreach(println) 

これは動作しますが、これは/大きなファイルでも実行配布する場合、私はテストすることはできません。

+0

私はこれに感謝しますが、唯一の問題は、入力ファイルのフルパスとファイル名。私は入力ディレクトリを指定するだけで、そこにあるすべての入力ファイルを取得します。 – mcmcmc

+0

あなたは現在どの機能を使用していますか?それは 'wholeTextFiles()'ですか? – Beryllium

+0

CSVファイルの場合、私はdatabricks/spark-csvライブラリの 'sqlContext.read.format(" com.databricks.spark.csv ")load("/path/dir/")'を使用しています。 parquetファイルの場合、 'sqlContext.read.parquet("/path/parquetdir/")'を使用します。 – mcmcmc

6

私はちょうど参考

val df = sqlContext.read.parquet("/blah/dir") 

val dfWithCol = df.withColumn("filename",input_file_name()) 
DATAFRAME

の列の一つとして、ファイル名を追加することが分かっ別の解決策:あなたは/がそれを必要としますかなぜ spark load data and add filename as dataframe column

関連する問題