2017-03-09 9 views
1

最新の学生レコードを学生日付でフィルタリングするためのスパークジョブを作成しています。しかし、これを数十万のレコードで試してみると、うまくいきます。しかし、私が多数のレコードでそれを実行すると、sparkjobがエラー以下に戻ります。GCオーバヘッド限度を超過して結合したスパークジョブ

私はテーブルからすべてのデータを読み込み、intをRDDに入れているので、このエラーが起こると思います。私のテーブルには約4.2百万のレコードが含まれているからです。そうであれば、それらのデータを効率的にロードして操作を成功させるためのよりよい方法がありますか?

誰もがデータベースの永続性に関連し、この

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 10.10.10.10): java.lang.OutOfMemoryError: GC overhead limit exceeded 
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2157) 
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1964) 
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3316) 
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:463) 
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3040) 
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2288) 
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2681) 
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551) 
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861) 
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1962) 
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:408) 
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:379) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) 
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) 
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) 
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) 
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) 
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) 
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 

17/03/09 10:54:09 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 2, 10.10.10.10, partition 0, PROCESS_LOCAL, 5288 bytes) 
17/03/09 10:54:09 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 2 on executor id: 1 hostname: 10.10.10.10. 
17/03/09 10:54:09 WARN TransportChannelHandler: Exception in connection from /10.10.10.10:48464 
java.io.IOException: Connection reset by peer 
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) 
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) 
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) 
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel. java:242) 
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNio ByteChannel.java:119) 
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoo p.java:468) 
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:38 2) 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEvent Executor.java:111) 
at java.lang.Thread.run(Thread.java:745) 
17/03/09 10:54:09 ERROR TaskSchedulerImpl: Lost executor 1 on  10.10.10.10: Remote RPC client disassociated. Likely due to containers  exceeding thresholds, or network issues. Check driver logs for WARN  messages. 
17/03/09 10:54:09 INFO StandaloneAppClient$ClientEndpoint: Executor  updated: app-20170309105209-0032/1 is now EXITED (Command exited with code  52) 

コード

object StudentDataPerformanceEnhancerImpl extends studentDataPerformanceEnhancer { 
    val LOG = LoggerFactory.getLogger(this.getClass.getName) 
    val USER_PRIMARY_KEY = "user_id"; 
    val COURSE_PRIMARY_KEY = "course_id"; 

    override def extractData(sparkContext: SparkContext, sparkSession: SparkSession, jobConfiguration: JobConfiguration): Unit = { 
     val context = sparkSession.read.format("jdbc") 
     .option("driver", "com.mysql.jdbc.Driver") 
     .option("url", jobConfiguration.jdbcURL) 
     .option("dbtable", "student_student") 
     .option("user", test_user) 
     .option("password", test_password) 
     .load() 
     context.cache() 

     val mainRDD = context.rdd.map(k => ((k.getLong(k.fieldIndex(USER_PRIMARY_KEY)), 
      k.getLong(k.fieldIndex(COURSE_PRIMARY_KEY)), 
      k.getTimestamp(k.fieldIndex("student_date_time"))), 
      (k.getLong(k.fieldIndex(USER_PRIMARY_KEY)), 
       k.getLong(k.fieldIndex(COURSE_PRIMARY_KEY)), 
       k.getTimestamp(k.fieldIndex("student_date_time")), 
       k.getString(k.fieldIndex("student_student_index")), 
       k.getLong(k.fieldIndex("student_category_pk")), 
       k.getString(k.fieldIndex("effeciency")), 
       k.getString(k.fieldIndex("net_score")), 
       k.getString(k.fieldIndex("avg_score")), 
       k.getString(k.fieldIndex("test_score"))))).persist(StorageLevel.DISK_ONLY) 

     LOG.info("Data extractions started....!") 
     try { 
      val studentCompositeRDD = context.rdd.map(r => ((r.getLong(r.fieldIndex(USER_PRIMARY_KEY)), 
      r.getLong(r.fieldIndex(COURSE_PRIMARY_KEY))), 
      r.getTimestamp(r.fieldIndex("student_date_time")))).reduceByKey((t1, t2) => if (t1.after(t2)) t1 else t2) 
       .map(t => ((t._1._1, t._1._2, t._2), t._2)).persist(StorageLevel.DISK_ONLY) 
      val filteredRDD = mainRDD.join(studentCompositeRDD).map(k => k._2._1) 
      DataWriter.persistLatestData(filteredRDD) 
     } catch { 
      case e: Exception => LOG.error("Error in spark job: " + e.getMessage) 
     } 
    } 
} 

マイDataWriterクラスが

object DataWriter { 
    def persistLatestStudentRiskData(rDD: RDD[(Long, Long, Timestamp, String, Long, String, String, String, String)]): Unit = { 
     var jdbcConnection: java.sql.Connection = null 
     try { 
      jdbcConnection = DatabaseUtil.getConnection 
      if (jdbcConnection != null) { 
       val statement = "{call insert_latest_student_risk (?,?,?,?,?,?,?,?,?)}" 
       val callableStatement = jdbcConnection.prepareCall(statement) 

       rDD.collect().foreach(x => sendLatestStudentRiskData(callableStatement, x)) 
      } 
     } catch { 
      case e: SQLException => LOG.error("Error in executing insert_latest_student_risk stored procedure : " + e.getMessage) 
      case e: RuntimeException => LOG.error("Error in the latest student persistence: " + e.getMessage) 
      case e: Exception => LOG.error("Error in the latest student persistence: " + e.getMessage) 
     } finally { 
      if (jdbcConnection != null) { 
       try { 
        jdbcConnection.close() 
       } catch { 
        case e: SQLException => LOG.error("Error in jdbc connection close : " + e.getMessage) 
        case e: Exception => LOG.error("Error in executing insert_latest_student_risk stored procedure : " + e.getMessage) 
       } 
      } 
     } 
    } 

    def sendLatestStudentRiskData(callableStatement: java.sql.CallableStatement, 
     latestStudentData: (Long, Long, Timestamp, String, Long, 
    String, String, String, String)): Unit = { 
     try { 
      callableStatement.setLong(1, latestStudentData._1) 
      callableStatement.setLong(2, latestStudentData._2) 
      callableStatement.setTimestamp(3, latestStudentData._3) 
      callableStatement.setString(4, latestStudentData._4) 
      callableStatement.setLong(5, latestStudentData._5) 
      callableStatement.setString(6, latestStudentData._6) 
      callableStatement.setString(7, latestStudentData._7) 
      callableStatement.setString(8, latestStudentData._8) 
      callableStatement.setString(9, latestStudentData._9) 

      callableStatement.executeUpdate 
     } catch { 
      case e: SQLException => LOG.error("Error in executing insert_latest_student_risk stored procedure : " + e.getMessage) 
     } 
    } 
} 
+0

「DataWriter.persistLatestData」はどこに定義されていますか? – puhlen

+0

@puhlen DataWriter.persistLatestDataを使用して質問を更新しました – Kepler

答えて

0

の下の問題は、あなたがしていることではありません解決するために私を助けてくださいRDDにデータを格納するということは、RDDからドライバメモリにデータを取り込むことです。具体的には、問題は、データを保持するために使用しているcollect呼び出しです。削除する必要があります。 collectは、RDD全体をドライバ上のメモリに持ち込み、データを処理するためにsparkとクラスタを使用しなくなり、データサイズが非常に小さい場合を除き、メモリが急に使い果たされます。 collectは、スパークプロセスで使用することはめったにありません。少量のデータを使用して開発やデバッグを行う場合に最も便利です。いくつかのサポート操作では本番アプリケーションでは使用されますが、主要データフローとしては使用されません。

spark-sqlを使用して、それを活用して収集するコールを削除すると、Sparkはjdbcに直接書き込むことができます。

関連する問題