2017-05-05 15 views
3

大容量(51GB)のXMLファイル(外部HDD)をデータフレームに読み込み(spark-xml pluginを使用)、簡単なマッピング/それを並べ替えてから、ディスクにCSVファイルとして書き戻します。Spark 2.1.0で大きなファイルを読み込んだときにメモリ不足エラーが発生しました

しかし、私はこれをどのように微調整しても常にjava.lang.OutOfMemoryError: Java heap spaceを取得します。私はパーティションの数を増やしていない理由を理解したい

は、個々の部分が小さく、メモリを起こさないように、OOMエラー

はそれがより多くの部分にタスクを分割すべきではない停止します問題?私が試した

(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)

もの:(5,000万パーティション)に合体

  • 再パーティション/読み込みとの少ない番号を使用して
  • (初期値は1604である)の書き込み時にデータフレーム執行(6、4、でも私はOOMエラーを取得執行と!)
  • 分割ファイルのサイズを小さくする(それは33メガバイトだようにデフォルトに見える)
  • RAMのトン(私が持っているすべてに)spark.memory.storageFraction
  • (デフォルトは0.5である)0.2には、30および40(デフォルトは私のために8に spark.default.parallelismを設定
  • 増加spark.memory.fraction
  • (デフォルトは0.6である)0.8に減少を与えます

    val df: DataFrame = spark.sqlContext.read 
        .option("mode", "DROPMALFORMED") 
        .format("com.databricks.spark.xml") 
        .schema(customSchema) // defined previously 
        .option("rowTag", "row") 
        .load(s"$pathToInputXML") 
    
    println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 
    // prints 1604 
    
    // i pass `numPartitions` as cli arguments 
    val df2 = df.coalesce(numPartitions) 
    
    // filter and select only the cols i'm interested in 
    val dsout = df2 
        .where(df2.col("_TypeId") === "1") 
        .select(
        df("_Id").as("id"), 
        df("_Title").as("title"), 
        df("_Body").as("body"), 
    ).as[Post] 
    
    // regexes to clean the text 
    val tagPat = "<[^>]+>".r 
    val angularBracketsPat = "><|>|<" 
    val whitespacePat = """\s+""".r 
    
    
    // more mapping 
    dsout 
    .map{ 
        case Post(id,title,body,tags) => 
    
        val body1 = tagPat.replaceAllIn(body,"") 
        val body2 = whitespacePat.replaceAllIn(body1," ") 
    
        Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(",")) 
    
    } 
    .orderBy(rand(SEED)) // random sort 
    .write // write it back to disk 
    .option("quoteAll", true) 
    .mode(SaveMode.Overwrite) 
    .csv(output) 
    
    :)
  • (デフォルトは128Mです)

すべての私のコードここにある(私は何をキャッシュしていないよ注意してください)64Mにspark.files.maxPartitionBytesを設定しました

NOTES

  • 入力分割は(33メガバイトのみ)本当に小さいですので、なぜ私は、8つのスレッド、各処理一方の分割を持つことができませんか?それは本当に私はちょうどそのファイルとforEachPartition(のprintln)を読み取るコードの短いバージョンを書いている私は

UPDATEをSEました(私の記憶を吹くべきではありません。

私は、同じOOMのエラーを取得:

val df: DataFrame = spark.sqlContext.read 
    .option("mode", "DROPMALFORMED") 
    .format("com.databricks.spark.xml") 
    .schema(customSchema) 
    .option("rowTag", "row") 
    .load(s"$pathToInputXML") 
    .repartition(numPartitions) 

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 

df 
    .where(df.col("_PostTypeId") === "1") 
    .select(
    df("_Id").as("id"), 
    df("_Title").as("title"), 
    df("_Body").as("body"), 
    df("_Tags").as("tags") 
).as[Post] 
    .map { 
    case Post(id, title, body, tags) => 
     Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase)) 
    } 
    .foreachPartition { rdd => 
    if (rdd.nonEmpty) { 
     println(s"HI! I'm an RDD and I have ${rdd.size} elements!") 
    } 
    } 

PS:私は、スパークV 2.1.0を使用しています私のマシンは8つのコアと16ギガバイトのRAMを内蔵しています。

+0

Spark UIで作成されたパーティションのサイズを調べましたか? – Khozzy

+0

@Khozzyこれは、読み込まれたDF用に1604個のパーティションと、DF用に50個のパーティションでアプリを実行したときのものです。[screenshot-spark-ui](http://i.imgur.com/a5LjEmc。 png) –

+0

はい、ジョブの実行中にUIを調べます。各タスクが実行されている時間と、CPUがどのように利用されているかを知ることができます(おそらくは不安定なものがあります)。 – Khozzy

答えて

0

あなたが二回あなたのRDDを格納していると あなたのロジックがSparkSql

val df: DataFrame = SparkFactory.spark.read 
     .option("mode", "DROPMALFORMED") 
     .format("com.databricks.spark.xml") 
     .schema(customSchema) // defined previously 
     .option("rowTag", "row") 
     .load(s"$pathToInputXML") 
     .coalesce(numPartitions) 

    println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n") 
    // prints 1604 


    // regexes to clean the text 
    val tagPat = "<[^>]+>".r 
    val angularBracketsPat = "><|>|<" 
    val whitespacePat = """\s+""".r 

    // filter and select only the cols i'm interested in 
    df 
     .where(df.col("_TypeId") === "1") 
     .select(
     df("_Id").as("id"), 
     df("_Title").as("title"), 
     df("_Body").as("body"), 
    ).as[Post] 
     .map{ 
     case Post(id,title,body,tags) => 

      val body1 = tagPat.replaceAllIn(body,"") 
      val body2 = whitespacePat.replaceAllIn(body1," ") 

      Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(",")) 

     } 
     .orderBy(rand(SEED)) // random sort 
     .write // write it back to disk 
     .option("quoteAll", true) 
     .mode(SaveMode.Overwrite) 
     .csv(output) 
+0

これをすべて単一のDFにすることは本当に助けにはなりませんでした。私はまだ 'java.lang.OutOfMemoryError:Java heap space'を持っています –

-2

と、このような変更またはフィルタでなければなりませんので、あなたは、あなたの環境変数に以下を追加することによって、ヒープサイズを変更することができます。

  1. 環境変数名:_JAVA_OPTIONS
  2. 環境変数値:-Xmx512M -Xms512M
関連する問題