2016-06-01 14 views
3

私は約60.000.000行のファイルをたくさん持っています。すべてのファイルは、フォーマットでフォーマットされています{timestamp}#{producer}#{messageId}#{data_bytes}\nSparkのマップタスクで大量のメモリが消費される

私は自分のファイルを1つずつ歩き、入力ファイルごとに1つの出力ファイルを作成します。 いくつかの行は前の行に依存しているので、私はその行をプロデューサでグループ化しました。 1つの行が1つ以上の前の行に依存するときはいつでも、それらのプロデューサは常に同じです。 すべての行をグループ化した後、私はそれらをJavaパーサに渡します。 パーサは、解析されたすべてのデータオブジェクトをメモリに格納し、後でJSONとして出力します。

私のジョブがどのように処理されたかを視覚化するために、私は以下の「フローグラフ」をまとめました。私はgroupByKey-Shuffeling-Processを視覚化しなかったことに注意してください。
flow graph

私の問題:

  • を私は、ファイル、プロセス別のタスクを分割を分割し、「部」-fileに各タスクの出力を保存するためにスパークを期待。
  • しかし、私のタスクはメモリが不足して終了する前にYARNによって殺されてしまいます。Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB physical memory used
  • 私のパーサーはすべての解析済みデータオブジェクトをメモリに投げつけています。私はパーサーのコードを変更することはできません。私はそのスパークを確認することができますどのように

    1. は私のコードは

私の質問(私の仕事への入力として600.000ラインごとに例えば2つのファイルを)小さなファイルのために働くことに注意してください私のマップタスクで分割されたすべてのファイルに対して結果が作成されますか?

  • 私のマップ変換val lineMap = lines.map ...(以下のScalaコードを参照)が分割されたrddを生成すると考えました。したがって、私は、rddの値が何らかの方法で分割されてから、2番目のマップ・タスクを呼び出すことを期待しています。

    さらに、このrdd lineMapでsaveAsTextFileを呼び出すと、マップタスクが完了した後に実行される出力タスクが生成されると考えました。私の前提が正しければ、私のエグゼキュータはまだメモリ不足になっています。 Sparkはいくつかの大きなファイル分割を行い、それらを同時に処理するため、Parserがメモリをいっぱいにします。

  • lineMap rddを再分割して、私のパーサーの入力をより多くの(小さな)入力にするのは良い考えですか?
  • 私が気づいていない追加の減速機ステップがありますか?結果がファイルに書き込まれる前に集計されるか、それと同様ですか?

  • Scalaのコード(私はunrelevantコード部分を省い):

    def main(args: Array[String]) { 
        val inputFilePath = args(0) 
        val outputFilePath = args(1) 
    
        val inputFiles = fs.listStatus(new Path(inputFilePath)) 
        inputFiles.foreach(filename => { 
         processData(filename.getPath, ...) 
        }) 
    } 
    
    
    def processData(filePath: Path, ...) { 
        val lines = sc.textFile(filePath.toString()) 
        val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey() 
    
        val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) } 
        //each output should be saved separately 
        parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)  
    } 
    
    
    def parseLinesByKey(key: String, values: Iterable[String], config : Config) = { 
        val importer = new LogFileImporter(...) 
        importer.parseData(values.toIterator.asJava, ...) 
    
        //importer from now contains all parsed data objects in memory that could be parsed 
        //from the given values. 
    
        val jsonMapper = getJsonMapper(...) 
        val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject) 
    
        (key, jsonStringData) 
    } 
    
    +2

    https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html – zero323

    +0

    エラーログをトレースすることで、メモリオーバーフローの原因となっている行を特定できますか? – GameOfThrows

    +0

    エラーログには、[リンク](http://i.imgur.com/cY6Hk5S.png)が表示されます。 – j9dy

    答えて

    0

    は、私は自分の限界を取り除くためにgroupByKey呼び出しを削除し、新しいFileInputFormatなどRecordReaderを実装することによって、これを固定しています行は他の行に依存します。今のところ、各分割に前の分割の50.000バイトのオーバーヘッドが含まれるように実装しました。これにより、前の行に依存するすべての行を正しく解析できます。

    私はこれから先の分割の最後の50.000バイトを調べますが、現在の分割の解析に実際に影響を与える行だけをコピーします。したがって、私はオーバーヘッドを最小限に抑え、依然として高度に並列化可能なタスクを取得します。

    次のリンクは私を正しい方向にドラッグしました。 FileInputFormat/RecordReaderのトピックが(それは少なくとも私のためだった)一見非常に複雑であるので、これらの記事を読んで、これはあなたの問題かどうかに適しているかどうかを理解することが良いです:

    関連するコード部分。作者(@Gurdt)はこれを使って、チャットメッセージにエスケープされた行の戻り値( "\"で終わる行)が含まれているかどうかを検出し、エスケープされた行をエスケープされない\ nが見つかるまで追加します。これにより、2つ以上の行にまたがるメッセージを取得することができます。 Scalaで書かれたコード:

    class MyRecordReader() extends RecordReader[LongWritable, Text] { 
        var start, end, position = 0L 
        var reader: LineReader = null 
        var key = new LongWritable 
        var value = new Text 
    
        override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = { 
         // split position in data (start one byte earlier to detect if 
         // the split starts in the middle of a previous record) 
         val split = inputSplit.asInstanceOf[FileSplit] 
         start = 0.max(split.getStart - 1) 
         end = start + split.getLength 
    
         // open a stream to the data, pointing to the start of the split 
         val stream = split.getPath.getFileSystem(context.getConfiguration) 
         .open(split.getPath) 
         stream.seek(start) 
         reader = new LineReader(stream, context.getConfiguration) 
    
         // if the split starts at a newline, we want to start yet another byte 
         // earlier to check if the newline was escaped or not 
         val firstByte = stream.readByte().toInt 
         if(firstByte == '\n') 
          start = 0.max(start - 1) 
         stream.seek(start) 
    
         if(start != 0) 
          skipRemainderFromPreviousSplit(reader) 
        } 
    
        def skipRemainderFromPreviousSplit(reader: LineReader): Unit = { 
         var readAnotherLine = true 
         while(readAnotherLine) { 
          // read next line 
          val buffer = new Text() 
          start += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE) 
          pos = start 
    
          // detect if delimiter was escaped 
          readAnotherLine = buffer.getLength >= 1 && // something was read 
          buffer.charAt(buffer.getLength - 1) == '\\' && // newline was escaped 
          pos <= end // seek head hasn't passed the split 
         } 
        } 
    
        override def nextKeyValue(): Boolean = { 
         key.set(pos) 
    
         // read newlines until an unescaped newline is read 
         var lastNewlineWasEscaped = false 
         while (pos < end || lastNewlineWasEscaped) { 
          // read next line 
          val buffer = new Text 
          pos += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE) 
    
          // append newly read data to previous data if necessary 
          value = if(lastNewlineWasEscaped) new Text(value + "\n" + buffer) else buffer 
    
          // detect if delimiter was escaped 
          lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1) == '\\' 
    
          // let Spark know that a key-value pair is ready! 
          if(!lastNewlineWasEscaped) 
           return true 
         } 
    
         // end of split reached? 
         return false 
        } 
    } 
    

    class MyFileInputFormat extends FileInputFormat[LongWritable, Text] { 
        override def createRecordReader(split: InputSplit, context: TaskAttemptContext): 
        RecordReader[LongWritable, Text] = new MyRecordReader() 
    } 
    

    RecordReader

    使用

    val conf = new Configuration(sparkContext.hadoopConfiguration) 
    val rdd = sparkContext.newAPIHadoopFile("data.txt", classOf[MyFileInputFormat], 
    classOf[LongWritable], classOf[Text], conf) 
    

    FileInputFormat注:あなたがあなたの中にgetCurrentKey、のgetCurrentValue、近いとでgetProgressを実装する必要がありますRecordReaderも同様です。

    関連する問題