DF

2016-09-14 10 views
1

を使用して書き出したときにスパークジョブがハングします。アプリケーションがsysDF.write.partitionByにexeし、最初の寄せ木版ファイルを正常に書き出します。しかしその後、一部の時間外労働が発生するまで、すべてのエグゼキュータが殺されてアプリケーションがハングアップします。アクションコードは以下の通りである:マップは、クエリを少し最適化しなければならないフィルタと組み合わせることができるDF

import sqlContext.implicits._ 

val systemRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[SystemLog]) basicLog.asInstanceOf[SystemLog] else null).filter(_ != null) 
val sysDF = systemRDD.toDF() 
sysDF.write.partitionBy("appId").parquet(outputPath + "/system/date=" + dateY4M2D2) 

val customRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[CustomLog]) basicLog.asInstanceOf[CustomLog] else null).filter(_ != null) 
val customDF = customRDD.toDF() 
customDF.write.partitionBy("appId").parquet(outputPath + "/custom/date=" + dateY4M2D2) 

val illegalRDD = basicLogRDD.map(basicLog => if (basicLog.isInstanceOf[IllegalLog]) basicLog.asInstanceOf[IllegalLog] else null).filter(_ != null) 
val illegalDF = illegalRDD.toDF() 
illegalDF.write.partitionBy("appId").parquet(outputPath + "/illegal/date=" + dateY4M2D2) 
+0

いくつかの情報、そこにある行の数、および 'appId'のいくつかの異なる値をいくつか提供できますか? –

+0

約100万行と500個の 'appId'があります –

答えて

0

まず、:

val rdd = basicLogRDD.cache() 

rdd.filter(_.isInstanceOf[SystemLog]).write.partitionBy("appId").parquet(outputPath + "/system/date=" + dateY4M2D2) 
rdd.filter(_.isInstanceOf[CustomLog]).write.partitionBy("appId").parquet(outputPath + "/custom/date=" + dateY4M2D2) 
rdd.filter(_.isInstanceOf[IllegalLog]).write.partitionBy("appId").parquet(outputPath + "/illegal/date=" + dateY4M2D2) 

まず、キャッシュすることをお勧めしbasicLogRDDを複数回使用しています。 .cache()演算子はkeep the RDD in memoryになります。 第2に、暗黙的にimplicitly converted to a DataFrameであるため、RDDをDataFrameに明示的に変換する必要はなく、Parquetを使用して格納できるようにします(import sqlContext.implicits._を定義する必要があります)。

+0

あなたの提案をありがとう! –