2017-04-10 10 views
1

最近スパークストリーミングを開始し、チェックポイントを実装しました。チェックポイントをHDFSに保存しています。ストリーミングが失敗したときに、最後のチェックポイントに戻ることはできますが、NullPointerExceptionが発生し、ストリーミングジョブが強制終了されます。私はHDFSのチェックポイントを見ることができます。なぜHDFSにchckpointがあるのに例外が出るのかわからない。どんな入力も参考になります。以下はチェックポイントから再起動すると、NullPointerExceptionでストリーミングジョブが失敗するのはなぜですか?

17/04/10 16:30:47 INFO JobGenerator: Batches during down time (2 batches):1491841680000 ms, 1491841800000 ms 
17/04/10 16:30:47 INFO JobGenerator: Batches pending processing (0 batches): 
17/04/10 16:30:47 INFO JobGenerator: Batches to reschedule (2 batches): 1491841680000 ms, 1491841800000 ms 
17/04/10 16:30:48 INFO JobScheduler: Added jobs for time 1491841680000 ms 
17/04/10 16:30:48 INFO JobScheduler: Starting job streaming job 1491841680000 ms.0 from job set of time 1491841680000 ms 
17/04/10 16:30:48 INFO SparkContext: Starting job: isEmpty at piadj.scala:34 
17/04/10 16:30:48 INFO DAGScheduler: Got job 0 (isEmpty at piadj.scala:34) with 1 output partitions 
17/04/10 16:30:48 INFO DAGScheduler: Final stage: ResultStage 0 (isEmpty at piadj.scala:34) 
17/04/10 16:30:48 INFO DAGScheduler: Parents of final stage: List() 
17/04/10 16:30:48 INFO DAGScheduler: Missing parents: List() 
17/04/10 16:30:48 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at piadj.scala:32), which has no missing parents 
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.1 KB, free 4.1 KB) 
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.1 KB, free 6.1 KB) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.26.118.23:35738 (size: 2.1 KB, free: 5.8 GB) 
17/04/10 16:30:48 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1008 
17/04/10 16:30:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at piadj.scala:32) 
17/04/10 16:30:48 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 
17/04/10 16:30:48 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, oser402370.wal-mart.com, partition 0,ANY, 2108 bytes) 
17/04/10 16:30:48 INFO JobScheduler: Added jobs for time 1491841800000 ms 
17/04/10 16:30:48 INFO RecurringTimer: Started timer for JobGenerator at time 1491841920000 
17/04/10 16:30:48 INFO JobGenerator: Restarted JobGenerator at 1491841920000 ms 
17/04/10 16:30:48 INFO JobScheduler: Starting job streaming job 1491841800000 ms.0 from job set of time 1491841800000 ms 
17/04/10 16:30:48 INFO JobScheduler: Started JobScheduler 
17/04/10 16:30:48 INFO StreamingContext: StreamingContext started 
17/04/10 16:30:48 INFO SparkContext: Starting job: isEmpty at piadj.scala:34 
17/04/10 16:30:48 INFO DAGScheduler: Got job 1 (isEmpty at piadj.scala:34) with 1 output partitions 
17/04/10 16:30:48 INFO DAGScheduler: Final stage: ResultStage 1 (isEmpty at piadj.scala:34) 
17/04/10 16:30:48 INFO DAGScheduler: Parents of final stage: List() 
17/04/10 16:30:48 INFO DAGScheduler: Missing parents: List() 
17/04/10 16:30:48 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at map at piadj.scala:32), which has no missing parents 
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 10.2 KB) 
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, free 12.3 KB) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.26.118.23:35738 (size: 2.1 KB, free: 5.8 GB) 
17/04/10 16:30:48 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008 
17/04/10 16:30:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at map at piadj.scala:32) 
17/04/10 16:30:48 INFO YarnClusterScheduler: Adding task set 1.0 with 1 tasks 
17/04/10 16:30:48 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, XXXXXXX, partition 0,ANY, 2108 bytes) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on XXXXXXX (size: 2.1 KB, free: 4.3 GB) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on XXXXXXX (size: 2.1 KB, free: 4.3 GB) 
17/04/10 16:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1142 ms on XXXXXXX (1/1) 
17/04/10 16:30:49 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/04/10 16:30:49 INFO DAGScheduler: ResultStage 0 (isEmpty at piadj.scala:34) finished in 1.151 s 
17/04/10 16:30:49 INFO DAGScheduler: Job 0 finished: isEmpty at piadj.scala:34, took 1.466449 s 
17/04/10 16:30:49 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 979 ms on XXXXXXX (1/1) 
17/04/10 16:30:49 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/04/10 16:30:49 INFO DAGScheduler: ResultStage 1 (isEmpty at piadj.scala:34) finished in 0.983 s 
17/04/10 16:30:49 INFO DAGScheduler: Job 1 finished: isEmpty at piadj.scala:34, took 1.006658 s 
17/04/10 16:30:49 INFO JobScheduler: Finished job streaming job 1491841680000 ms.0 from job set of time 1491841680000 ms 
17/04/10 16:30:49 INFO JobScheduler: Total delay: 169.575 s for time 1491841680000 ms (execution: 1.568 s) 
17/04/10 16:30:49 ERROR JobScheduler: Error running job streaming job 1491841680000 ms.0 
java.lang.NullPointerException 
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638) 
at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558) 
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362) 
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623) 
at walmart.com.piadj$$anonfun$createContext$1.apply(piadj.scala:39) 
at walmart.com.piadj$$anonfun$createContext$1.apply(piadj.scala:33) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at scala.util.Try$.apply(Try.scala:161) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

私のコード

def createContext(brokers:String,topics:String,checkpointDirectory:String):StreamingContext={  
val sparkConf = new SparkConf().setAppName("pi") 
val sc = new SparkContext(sparkConf) 
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
sqlContext.setConf("hive.exec.dynamic.partition", "true") 
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 
val ssc = new StreamingContext(sc, Seconds(1)) 
ssc.checkpoint(checkpointDirectory) 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet) 
val lines = messages.map(_._2) 
lines.foreachRDD { rdd => 
    if(!rdd.isEmpty()) { 
    import sqlContext.implicits._ 
    val rdd2 = rdd.map(x => new JsonDeserializer().deserialize("pi_adj",x))   
    val rdd3 = rdd2.map(x => new String(x,"UTF-8")) 
    val df1 = sqlContext.read.json(rdd3) 
    /*some other transformations and inserting into hive*/ 
} 
} 
ssc 
} 
def main(args: Array[String]) { 
if (args.length < 3) { 
    System.err.println("Usage: streaming <brokers> <topics> <checkpointDirectory>") 
    System.exit(1) 
} 
val Array(brokers,topics,checkpointDirectory) = args 
val ssc = StreamingContext.getOrCreate(checkpointDirectory,()=>createContext(brokers,topics,checkpointDirectory)) 
ssc.start() 
ssc.awaitTermination() 
} 
+0

この問題の解決策をお探しですか?ソリューションを共有していただけますか? – yoga

答えて

0

TLであり、drはcreateContext外カフカDSTREAMとforeachを作成し、mainでそれを使用するようにコードを移動しthe scaladoc of StreamingContextによると

のどちらかがチェックポイント・データからStreamingContextを再作成するか、新しいStreamingContextを作成します。チェックポイントデータが提供されたcheckpointPathに存在する場合、StreamingContextはチェックポイントデータから再作成されます。データが存在しない場合、提供されているcreatingFuncを呼び出してStreamingContextを作成します。 StreamingContextのみチェックポイントを有効にして、おそらく新しいStreamingContextを作成する必要があります作成する

そして、それははっきり言われていないかもしれないが、creatingFunc他に何もありません。

あなたはカフカDSTREAMを作成するためのコードとforeachRDDcreateContextを移動し、mainの一部として(sscが初期化された直後に、それを開始する前に)それを持っている必要があります。

+0

チェックポイントを 'createContext'の外に移動する必要がありますか? 'createContext'の中には何があるべきですか? – pharpan

+0

私は 'createContext'の外でこれをやっていますが、内部に何が起こっているかチェックしています。私は両方がうまくいくと思う。 –

+0

@JacekLaskowski DStreamとそれに関連する操作(つまりforeachRDD)を 'creatingFunc'の外で定義することをお勧めします。ストリーミングアプリケーションが障害から復元されると、以前に作成されたDStream操作は復元されませんので、再度宣言する必要はありません。 – autodidacticon

関連する問題