大容量(51GB)のXMLファイル(外部HDD)をデータフレームに読み込み(spark-xml pluginを使用)、簡単なマッピング/それを並べ替えてから、ディスクにCSVファイルとして書き戻します。Spark 2.1.0で大きなファイルを読み込んだときにメモリ不足エラーが発生しました
しかし、私はこれをどのように微調整しても常にjava.lang.OutOfMemoryError: Java heap space
を取得します。私はパーティションの数を増やしていない理由を理解したい
は、個々の部分が小さく、メモリを起こさないように、OOMエラー
はそれがより多くの部分にタスクを分割すべきではない停止します問題?私が試した
(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)
もの:(5,000万パーティション)に合体
- 再パーティション/読み込みとの少ない番号を使用して
- (初期値は1604である)の書き込み時にデータフレーム執行(6、4、でも私はOOMエラーを取得執行と!)
- 分割ファイルのサイズを小さくする(それは33メガバイトだようにデフォルトに見える)
- RAMのトン(私が持っているすべてに)
spark.memory.storageFraction
- (デフォルトは0.5である)0.2には、30および40(デフォルトは私のために8に
- 増加
spark.memory.fraction
- (デフォルトは0.6である)0.8に減少を与えます
spark.default.parallelism
を設定
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where(df2.col("_TypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// more mapping
dsout
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
:)
すべての私のコードここにある(私は何をキャッシュしていないよ注意してください)64Mにspark.files.maxPartitionBytes
を設定しました
NOTES
- 入力分割は(33メガバイトのみ)本当に小さいですので、なぜ私は、8つのスレッド、各処理一方の分割を持つことができませんか?それは本当に私はちょうどそのファイルとforEachPartition(のprintln)を読み取るコードの短いバージョンを書いている私は
UPDATEをSEました(私の記憶を吹くべきではありません。
私は、同じOOMのエラーを取得:
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema)
.option("rowTag", "row")
.load(s"$pathToInputXML")
.repartition(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
df
.where(df.col("_PostTypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
df("_Tags").as("tags")
).as[Post]
.map {
case Post(id, title, body, tags) =>
Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
}
.foreachPartition { rdd =>
if (rdd.nonEmpty) {
println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
}
}
PS:私は、スパークV 2.1.0を使用しています私のマシンは8つのコアと16ギガバイトのRAMを内蔵しています。
Spark UIで作成されたパーティションのサイズを調べましたか? – Khozzy
@Khozzyこれは、読み込まれたDF用に1604個のパーティションと、DF用に50個のパーティションでアプリを実行したときのものです。[screenshot-spark-ui](http://i.imgur.com/a5LjEmc。 png) –
はい、ジョブの実行中にUIを調べます。各タスクが実行されている時間と、CPUがどのように利用されているかを知ることができます(おそらくは不安定なものがあります)。 – Khozzy