2016-09-30 5 views
0

をとる場合、次のように私は100万行を含むファイルを読み取る(各行は、US1.234.567B1としてコードを有する)とのうちいくつかのパターンを取得し、スパーク・アプリケーションを持っている:スパークのOutOfMemoryError大きな入力ファイル

val codes = sc.textFile("/data/codes.txt") 

    def getPattern(code: String) = code.replaceAll("\\d", "d") 

    val patterns: RDD[(String, Int)] = codes 
    .groupBy(getPattern) 
    .mapValues(_.size) 
    .sortBy(- _._2) 

    patterns 
    .map { case (pattern, size) => s"$size\t$pattern" } 
    .saveAsTextFile("/tmp/patterns") 

私はこれをmaster = local [*]で実行しており、それはjava.lang.OutOfMemoryError: GC overhead limit exceededで失敗します。

なぜですか?

私はSparkが十分なハードディスク容量を持っている限り、どんなサイズの入力も処理できると思っていました。

+1

2ドキュメント: https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html https://github.com/awesome -spark/spark-gotchas –

答えて

2

あなたはアンチパターンをスパーク使用しようとしているロングショート:

として簡単に例えば表現することができる
.groupBy(getPattern) 
.mapValues(_.size) 

:私はスパークが任意のサイズを扱うことができると考え

codes.keyBy(getPattern).mapValues(_ => 1L).reduceByKey(_ + _).sortBy(_._2, false) 

入力の。

通常、不可能にしない限り、スケールアウトできます。 RDD上のgroup/groupByKeyは、各キーのローカルコレクションを作成します。これらのそれぞれは、一人のエグゼキュータの記憶にあります。

1

はいsparkは非常に大きなファイルを処理できますが、並列処理の単位はエグゼキュータです。 'メモリ不足エラー'は、スパークエグゼキュータメモリまたはスパークドライバメモリが不足しているためです。 spark.executor.memoryとspark.driver.memoryを増やし、ジョブをサブミットする前にエグゼキュータの数を調整してみてください。

これらの値は、spark-submit中にプロパティファイルまたはSparkConfで、またはコマンドラインで直接設定できます。リンクGROUPBYは避けるべき理由を説明http://spark.apache.org/docs/latest/configuration.html

関連する問題