1

2つの小さなデータセットで計算を実行するSparkアプリケーション(Spark 1.6.3クラスタ)を実行しています。 S3パーケットファイル。Spark DataFrame java.lang.OutOfMemoryError:長いループ実行時にGCオーバーヘッドの上限を超えました

public void doWork(JavaSparkContext sc, Date writeStartDate, Date writeEndDate, String[] extraArgs) throws Exception { 
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); 
    S3Client s3Client = new S3Client(ConfigTestingUtils.getBasicAWSCredentials()); 

    boolean clearOutputBeforeSaving = false; 
    if (extraArgs != null && extraArgs.length > 0) { 
     if (extraArgs[0].equals("clearOutput")) { 
      clearOutputBeforeSaving = true; 
     } else { 
      logger.warn("Unknown param " + extraArgs[0]); 
     } 
    } 

    Date currRunDate = new Date(writeStartDate.getTime()); 
    while (currRunDate.getTime() < writeEndDate.getTime()) { 
     try { 

      SparkReader<FirstData> sparkReader = new SparkReader<>(sc); 
      JavaRDD<FirstData> data1 = sparkReader.readDataPoints(
        inputDir, 
        currRunDate, 
        getMinOfEndDateAndNextDay(currRunDate, writeEndDate)); 
      // Normalize to 1 hours & 0.25 degrees 
      JavaRDD<FirstData> distinctData1 = data1.distinct(); 

      // Floor all (distinct) values to 6 hour windows 
      JavaRDD<FirstData> basicData1BySixHours = distinctData1.map(d1 -> new FirstData(
        d1.getId(), 
        TimeUtils.floorTimePerSixHourWindow(d1.getTimeStamp()), 
        d1.getLatitude(), 
        d1.getLongitude())); 

      // Convert Data1 to Dataframes 
      DataFrame data1DF = sqlContext.createDataFrame(basicData1BySixHours, FirstData.class); 
      data1DF.registerTempTable("data1"); 

      // Read Data2 DataFrame 
      String currDateString = TimeUtils.getSimpleDailyStringFromDate(currRunDate); 
      String inputS3Path = basedirInput + "/dt=" + currDateString; 
      DataFrame data2DF = sqlContext.read().parquet(inputS3Path); 
      data2DF.registerTempTable("data2"); 

      // Join data1 and data2 
      DataFrame mergedDataDF = sqlContext.sql("SELECT D1.Id,D2.beaufort,COUNT(1) AS hours " + 
        "FROM data1 as D1,data2 as D2 " + 
        "WHERE D1.latitude=D2.latitude AND D1.longitude=D2.longitude AND D1.timeStamp=D2.dataTimestamp " + 
        "GROUP BY D1.Id,D1.timeStamp,D1.longitude,D1.latitude,D2.beaufort"); 

      // Create histogram per ID 
      JavaPairRDD<String, Iterable<Row>> mergedDataRows = mergedDataDF.toJavaRDD().groupBy(md -> md.getAs("Id")); 
      JavaRDD<MergedHistogram> mergedHistogram = mergedDataRows.map(new MergedHistogramCreator()); 

      logger.info("Number of data1 results: " + data1DF.select("lId").distinct().count()); 
      logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count()); 
      logger.info("Number of results with beaufort histograms: " + mergedDataDF.select("Id").distinct().count()); 

      // Save to parquet 
      String outputS3Path = basedirOutput + "/dt=" + TimeUtils.getSimpleDailyStringFromDate(currRunDate); 
      if (clearOutputBeforeSaving) { 
       writeWithCleanup(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext, s3Client); 
      } else { 
       write(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext); 
      } 
     } finally { 
      TimeUtils.progressToNextDay(currRunDate); 
     } 
    } 
} 

public void write(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass, SQLContext sqlContext) { 
    // Apply a schema to an RDD of JavaBeans and save it as Parquet. 
    DataFrame fullDataDF = sqlContext.createDataFrame(outputRDD, outputClass); 
    fullDataDF.write().parquet(outputS3Path); 
} 

public void writeWithCleanup(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass, 
          SQLContext sqlContext, S3Client s3Client) { 
    String fileKey = S3Utils.getS3Key(outputS3Path); 
    String bucket = S3Utils.getS3Bucket(outputS3Path); 

    logger.info("Deleting existing dir: " + outputS3Path); 
    s3Client.deleteAll(bucket, fileKey); 

    write(outputS3Path, outputRDD, outputClass, sqlContext); 
} 

public Date getMinOfEndDateAndNextDay(Date startTime, Date proposedEndTime) { 
    long endOfDay = startTime.getTime() - startTime.getTime() % MILLIS_PER_DAY + MILLIS_PER_DAY ; 
    if (endOfDay < proposedEndTime.getTime()) { 
     return new Date(endOfDay); 
    } 
    return proposedEndTime; 
} 

DATA1の大きさは約150,000とDATA2は約500,000:

は、ここに私のコードです。

私のコードは基本的にいくつかのデータ操作を行い、2つのデータオブジェクトをマージし、少し操作し、いくつかの統計を出力し、寄木細工に保存します。

スパークにはサーバーあたり25GBのメモリがあり、コードはうまく動作します。 各反復には約2〜3分かかります。

問題は、大きな日付セットで実行すると始まります。それは走った

java.lang.OutOfMemoryError: GC overhead limit exceeded 
    at scala.collection.immutable.List.$colon$colon$colon(List.scala:127) 
    at org.json4s.JsonDSL$JsonListAssoc.$tilde(JsonDSL.scala:98) 
    at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:139) 
    at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:72) 
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144) 
    at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:164) 
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42) 
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55) 
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:38) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:87) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:72) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:72) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:71) 
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:70) 

前回は、それは233回の反復後に墜落:

はしばらくして、私はのOutOfMemoryを取得します。

それは上のクラッシュした行はこのでした:

logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count()); 

誰もが最終的なクラッシュの原因になることができるかを教えていただけますか?

答えて

0

誰もがこの解決策を見つけられるとは確信していませんが、Sparkクラスタを2.2.0にアップグレードすると問題が解決されたようです。

私は数日間アプリケーションを実行しましたが、クラッシュはまだ発生していません。

0

このエラーは、GCがプロセスの実行時間の98%以上を占める場合に発生します。ステージのタブhttp://master:4040にアクセスしてSpark Web UIでGC時間を監視できます。

sparkアプリケーションを送信中に--confを使用して、spark。{driver/executor} .memoryを使用して、ドライバ/エグゼキュータ(このエラーが発生しているもの)メモリを増やしてみてください。

もう1つのことは、Javaが使用しているガベージコレクタを変更することです。この記事を読む:https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.htmlなぜGCオーバーヘッドエラーが発生するのか、どのガベージコレクタがアプリケーションに最適なのかを非常に明確に説明しています。

+0

私はステージタブを時折チェックしましたが、メモリ消費量の増加や異常なものは一切表示されませんでした。 – AlexM

+0

投稿したブログ投稿を読んだことがあります。 スパークの古いバージョンについて話しているようです。それ以来、特にメモリ管理部門で多くの変更がありました。 また、記事ではRDDについて議論していますが、主にDataFramesで作業しています。 – AlexM

+0

spark web ui https://i.stack.imgur.com/YkwYR.pngにこのようなものがありますか?おそらく、各タスクのGC時間が非常に長くなるでしょう。また、ドライバ/エグゼキュータのメモリを増やしてみましたか?デフォルトは1GBです。システムで使用可能な最大値に設定し、動作するかどうか確認してください。 –

関連する問題