私は約60.000.000行のファイルをたくさん持っています。すべてのファイルは、フォーマットでフォーマットされています{timestamp}#{producer}#{messageId}#{data_bytes}\n
Sparkのマップタスクで大量のメモリが消費される
私は自分のファイルを1つずつ歩き、入力ファイルごとに1つの出力ファイルを作成します。 いくつかの行は前の行に依存しているので、私はその行をプロデューサでグループ化しました。 1つの行が1つ以上の前の行に依存するときはいつでも、それらのプロデューサは常に同じです。 すべての行をグループ化した後、私はそれらをJavaパーサに渡します。 パーサは、解析されたすべてのデータオブジェクトをメモリに格納し、後でJSONとして出力します。
私のジョブがどのように処理されたかを視覚化するために、私は以下の「フローグラフ」をまとめました。私はgroupByKey
-Shuffeling-Processを視覚化しなかったことに注意してください。
私の問題:
- を私は、ファイル、プロセス別のタスクを分割を分割し、「部」-fileに各タスクの出力を保存するためにスパークを期待。
- しかし、私のタスクはメモリが不足して終了する前にYARNによって殺されてしまいます。
Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB physical memory used
- 私のパーサーはすべての解析済みデータオブジェクトをメモリに投げつけています。私はパーサーのコードを変更することはできません。私はそのスパークを確認することができますどのように
- :
- は私のコードは
私の質問(私の仕事への入力として600.000ラインごとに例えば2つのファイルを)小さなファイルのために働くことに注意してください私のマップタスクで分割されたすべてのファイルに対して結果が作成されますか?
私のマップ変換val lineMap = lines.map ...
(以下のScalaコードを参照)が分割されたrddを生成すると考えました。したがって、私は、rddの値が何らかの方法で分割されてから、2番目のマップ・タスクを呼び出すことを期待しています。
さらに、このrdd lineMap
でsaveAsTextFileを呼び出すと、マップタスクが完了した後に実行される出力タスクが生成されると考えました。私の前提が正しければ、私のエグゼキュータはまだメモリ不足になっています。 Sparkはいくつかの大きなファイル分割を行い、それらを同時に処理するため、Parserがメモリをいっぱいにします。
lineMap
rddを再分割して、私のパーサーの入力をより多くの(小さな)入力にするのは良い考えですか?Scalaのコード(私はunrelevantコード部分を省い):
def main(args: Array[String]) {
val inputFilePath = args(0)
val outputFilePath = args(1)
val inputFiles = fs.listStatus(new Path(inputFilePath))
inputFiles.foreach(filename => {
processData(filename.getPath, ...)
})
}
def processData(filePath: Path, ...) {
val lines = sc.textFile(filePath.toString())
val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()
val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
//each output should be saved separately
parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)
}
def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
val importer = new LogFileImporter(...)
importer.parseData(values.toIterator.asJava, ...)
//importer from now contains all parsed data objects in memory that could be parsed
//from the given values.
val jsonMapper = getJsonMapper(...)
val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)
(key, jsonStringData)
}
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html – zero323
エラーログをトレースすることで、メモリオーバーフローの原因となっている行を特定できますか? – GameOfThrows
エラーログには、[リンク](http://i.imgur.com/cY6Hk5S.png)が表示されます。 – j9dy