を使用して書き出したときにスパークジョブがハングします。アプリケーションがsysDF.write.partitionBy
にexeし、最初の寄せ木版ファイルを正常に書き出します。しかしその後、一部の時間外労働が発生するまで、すべてのエグゼキュータが殺されてアプリケーションがハングアップします。アクションコードは以下の通りである:マップは、クエリを少し最適化しなければならないフィルタと組み合わせることができるDF
import sqlContext.implicits._
val systemRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[SystemLog]) basicLog.asInstanceOf[SystemLog] else null).filter(_ != null)
val sysDF = systemRDD.toDF()
sysDF.write.partitionBy("appId").parquet(outputPath + "/system/date=" + dateY4M2D2)
val customRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[CustomLog]) basicLog.asInstanceOf[CustomLog] else null).filter(_ != null)
val customDF = customRDD.toDF()
customDF.write.partitionBy("appId").parquet(outputPath + "/custom/date=" + dateY4M2D2)
val illegalRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[IllegalLog]) basicLog.asInstanceOf[IllegalLog] else null).filter(_ != null)
val illegalDF = illegalRDD.toDF()
illegalDF.write.partitionBy("appId").parquet(outputPath + "/illegal/date=" + dateY4M2D2)
いくつかの情報、そこにある行の数、および 'appId'のいくつかの異なる値をいくつか提供できますか? –
約100万行と500個の 'appId'があります –