テキストファイルからRDDを作成する場合、あなたはおそらく、あなたがその段階で入力ソースを追加することができますので、ケースクラスにデータをマッピングする:
case class Person(inputPath: String, name: String, age: Int)
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt"
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
Person(inputPath, tokens(0), tokens(1).trim().toInt)
}
rdd.collect().foreach(println)
あなたがしたくない場合
case class InputSourceMetaData(path: String, size: Long)
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData)
// Fake the size, for demo purposes only
val md = InputSourceMetaData(inputPath, size = -1L)
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
rdd.collect().foreach(println)
、あなたはデータフレームにRDDを推進している場合:メタデータと "業務データを" ミックス
import sqlContext.implicits._
val df = rdd.toDF()
df.registerTempTable("x")
あなたは
あなたは再帰的にorg.apache.hadoop.fs.FileSystem.listFiles()
を使用してHDFS内のファイルを読み込むことができ
sqlContext.sql("select name, metadata from x").show()
sqlContext.sql("select name, metadata.path from x").show()
sqlContext.sql("select name, metadata.path, metadata.size from x").show()
アップデートのようにそれを照会することができます。
値files
内のファイル名のリスト(org.apache.hadoop.fs.LocatedFileStatus
を含む標準のScalaのコレクション)を考えると、あなたはファイルごとに1 RDDを作成することができますが:
val rdds = files.map { f =>
val md = InputSourceMetaData(f.getPath.toString, f.getLen)
sc.textFile(md.path).map {
l =>
val tokens = l.split(",")
PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
}
今、あなたは、単一の一つにRDDSのリストをreduce
することができます:単一のものにreduce
concatsすべてRDDS機能:
val rdd = rdds.reduce(_ ++ _)
rdd.collect().foreach(println)
これは動作しますが、これは/大きなファイルでも実行配布する場合、私はテストすることはできません。
? –
各レコードは元々どのファイルであるかを表示する必要があります。不正な入力ファイルのように、パス全体を知っているときにデバッグするのが簡単です – mcmcmc