最近スパークストリーミングを開始し、チェックポイントを実装しました。チェックポイントをHDFSに保存しています。ストリーミングが失敗したときに、最後のチェックポイントに戻ることはできますが、NullPointerExceptionが発生し、ストリーミングジョブが強制終了されます。私はHDFSのチェックポイントを見ることができます。なぜHDFSにchckpointがあるのに例外が出るのかわからない。どんな入力も参考になります。以下はチェックポイントから再起動すると、NullPointerExceptionでストリーミングジョブが失敗するのはなぜですか?
17/04/10 16:30:47 INFO JobGenerator: Batches during down time (2 batches):1491841680000 ms, 1491841800000 ms
17/04/10 16:30:47 INFO JobGenerator: Batches pending processing (0 batches):
17/04/10 16:30:47 INFO JobGenerator: Batches to reschedule (2 batches): 1491841680000 ms, 1491841800000 ms
17/04/10 16:30:48 INFO JobScheduler: Added jobs for time 1491841680000 ms
17/04/10 16:30:48 INFO JobScheduler: Starting job streaming job 1491841680000 ms.0 from job set of time 1491841680000 ms
17/04/10 16:30:48 INFO SparkContext: Starting job: isEmpty at piadj.scala:34
17/04/10 16:30:48 INFO DAGScheduler: Got job 0 (isEmpty at piadj.scala:34) with 1 output partitions
17/04/10 16:30:48 INFO DAGScheduler: Final stage: ResultStage 0 (isEmpty at piadj.scala:34)
17/04/10 16:30:48 INFO DAGScheduler: Parents of final stage: List()
17/04/10 16:30:48 INFO DAGScheduler: Missing parents: List()
17/04/10 16:30:48 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at piadj.scala:32), which has no missing parents
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.1 KB, free 4.1 KB)
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.1 KB, free 6.1 KB)
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.26.118.23:35738 (size: 2.1 KB, free: 5.8 GB)
17/04/10 16:30:48 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1008
17/04/10 16:30:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at piadj.scala:32)
17/04/10 16:30:48 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks
17/04/10 16:30:48 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, oser402370.wal-mart.com, partition 0,ANY, 2108 bytes)
17/04/10 16:30:48 INFO JobScheduler: Added jobs for time 1491841800000 ms
17/04/10 16:30:48 INFO RecurringTimer: Started timer for JobGenerator at time 1491841920000
17/04/10 16:30:48 INFO JobGenerator: Restarted JobGenerator at 1491841920000 ms
17/04/10 16:30:48 INFO JobScheduler: Starting job streaming job 1491841800000 ms.0 from job set of time 1491841800000 ms
17/04/10 16:30:48 INFO JobScheduler: Started JobScheduler
17/04/10 16:30:48 INFO StreamingContext: StreamingContext started
17/04/10 16:30:48 INFO SparkContext: Starting job: isEmpty at piadj.scala:34
17/04/10 16:30:48 INFO DAGScheduler: Got job 1 (isEmpty at piadj.scala:34) with 1 output partitions
17/04/10 16:30:48 INFO DAGScheduler: Final stage: ResultStage 1 (isEmpty at piadj.scala:34)
17/04/10 16:30:48 INFO DAGScheduler: Parents of final stage: List()
17/04/10 16:30:48 INFO DAGScheduler: Missing parents: List()
17/04/10 16:30:48 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at map at piadj.scala:32), which has no missing parents
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 10.2 KB)
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, free 12.3 KB)
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.26.118.23:35738 (size: 2.1 KB, free: 5.8 GB)
17/04/10 16:30:48 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008
17/04/10 16:30:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at map at piadj.scala:32)
17/04/10 16:30:48 INFO YarnClusterScheduler: Adding task set 1.0 with 1 tasks
17/04/10 16:30:48 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, XXXXXXX, partition 0,ANY, 2108 bytes)
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on XXXXXXX (size: 2.1 KB, free: 4.3 GB)
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on XXXXXXX (size: 2.1 KB, free: 4.3 GB)
17/04/10 16:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1142 ms on XXXXXXX (1/1)
17/04/10 16:30:49 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/04/10 16:30:49 INFO DAGScheduler: ResultStage 0 (isEmpty at piadj.scala:34) finished in 1.151 s
17/04/10 16:30:49 INFO DAGScheduler: Job 0 finished: isEmpty at piadj.scala:34, took 1.466449 s
17/04/10 16:30:49 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 979 ms on XXXXXXX (1/1)
17/04/10 16:30:49 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/04/10 16:30:49 INFO DAGScheduler: ResultStage 1 (isEmpty at piadj.scala:34) finished in 0.983 s
17/04/10 16:30:49 INFO DAGScheduler: Job 1 finished: isEmpty at piadj.scala:34, took 1.006658 s
17/04/10 16:30:49 INFO JobScheduler: Finished job streaming job 1491841680000 ms.0 from job set of time 1491841680000 ms
17/04/10 16:30:49 INFO JobScheduler: Total delay: 169.575 s for time 1491841680000 ms (execution: 1.568 s)
17/04/10 16:30:49 ERROR JobScheduler: Error running job streaming job 1491841680000 ms.0
java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623)
at walmart.com.piadj$$anonfun$createContext$1.apply(piadj.scala:39)
at walmart.com.piadj$$anonfun$createContext$1.apply(piadj.scala:33)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
私のコード
def createContext(brokers:String,topics:String,checkpointDirectory:String):StreamingContext={
val sparkConf = new SparkConf().setAppName("pi")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint(checkpointDirectory)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
lines.foreachRDD { rdd =>
if(!rdd.isEmpty()) {
import sqlContext.implicits._
val rdd2 = rdd.map(x => new JsonDeserializer().deserialize("pi_adj",x))
val rdd3 = rdd2.map(x => new String(x,"UTF-8"))
val df1 = sqlContext.read.json(rdd3)
/*some other transformations and inserting into hive*/
}
}
ssc
}
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: streaming <brokers> <topics> <checkpointDirectory>")
System.exit(1)
}
val Array(brokers,topics,checkpointDirectory) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,()=>createContext(brokers,topics,checkpointDirectory))
ssc.start()
ssc.awaitTermination()
}
この問題の解決策をお探しですか?ソリューションを共有していただけますか? – yoga