HBase RDDを使用して着信DStreamと結合する簡単なStreamingアプリケーションを構築しています。 サンプルコード:Apache Spark:チェックポイントから復旧中のNPE
val indexState = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]).map { case (rowkey, v) => //some logic}
val result = dStream.transform { rdd =>
rdd.leftOuterJoin(indexState)
}
それは正常に動作しますが、私たちはStreamingContext のためのチェックポイントを有効にすると、アプリケーションは、以前に作成したチェックポイントから回復する をさせたときに、それは常にNullPointerExceptionがスローされます。
ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.lang.NullPointerException
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
誰もが同じ問題に直面しましたか? バージョン:
- スパーク1.6.xの
- のHadoop 2.7.35
ありがとう!
あなたは「以前に作成したチェックポイント」と言うとき、それはストリーミングジョブが停止し、再提出された意味ですか? – ImDarrenG