以下のこの基本的なSparkストリーミングコードを使用して、チェックポイントと書き込みログをテストしています。私はローカルディレクトリにチェックポイントしています。 (のCtrl - Cを使用して)アプリケーションを数回起動して停止した後、チェックポイント指示のデータが破損しているように見えるので、起動を拒否します。 I取得しています:Spark Streamingでチェックポイントデータが破損する
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 80.0 failed 1 times, most recent failure: Lost task 0.0 in stage 80.0 (TID 17, localhost): com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
全コード:
import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.streaming._
object ProtoDemo {
def createContext(dirName: String) = {
val conf = new SparkConf().setAppName("mything")
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint(dirName)
val lines = ssc.socketTextStream("127.0.0.1", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
val runningCounts = wordCounts.updateStateByKey[Int] {
(values: Seq[Int], oldValue: Option[Int]) =>
val s = values.sum
Some(oldValue.fold(s)(_ + s))
}
// Print the first ten elements of each RDD generated in this DStream to the console
runningCounts.print()
ssc
}
def main(args: Array[String]) = {
val hadoopConf = new Configuration()
val dirName = "/tmp/chkp"
val ssc = StreamingContext.getOrCreate(dirName,() => createContext(dirName), hadoopConf)
ssc.start()
ssc.awaitTermination()
}
}
hdfsのような信頼できるファイルシステムを使用して、エラーがないかどうかを確認してください。 – Knight71
S3で試したが、それでも起こる。私は破損が先行ログで起こると信じています。 – thesamet
あなたはどのバージョンのスパークを使用していますか? 1.6または2.0? –