2016-10-03 19 views
1

以下のこの基本的なSparkストリーミングコードを使用して、チェックポイントと書き込みログをテストしています。私はローカルディレクトリにチェックポイントしています。 (のCtrl - Cを使用して)アプリケーションを数回起動して停止した後、チェックポイント指示のデータが破損しているように見えるので、起動を拒否します。 I取得しています:Spark Streamingでチェックポイントデータが破損する

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 80.0 failed 1 times, most recent failure: Lost task 0.0 in stage 80.0 (TID 17, localhost): com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) 
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) 
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) 
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) 
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) 

全コード:

import org.apache.hadoop.conf.Configuration 
import org.apache.spark._ 
import org.apache.spark.streaming._ 

object ProtoDemo { 
    def createContext(dirName: String) = { 
    val conf = new SparkConf().setAppName("mything") 
    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") 

    val ssc = new StreamingContext(conf, Seconds(1)) 
    ssc.checkpoint(dirName) 
    val lines = ssc.socketTextStream("127.0.0.1", 9999) 
    val words = lines.flatMap(_.split(" ")) 
    val pairs = words.map(word => (word, 1)) 
    val wordCounts = pairs.reduceByKey(_ + _) 
    val runningCounts = wordCounts.updateStateByKey[Int] { 
     (values: Seq[Int], oldValue: Option[Int]) => 
     val s = values.sum 
     Some(oldValue.fold(s)(_ + s)) 
     } 

    // Print the first ten elements of each RDD generated in this DStream to the console 
    runningCounts.print() 
    ssc 
    } 

    def main(args: Array[String]) = { 
    val hadoopConf = new Configuration() 
    val dirName = "/tmp/chkp" 
    val ssc = StreamingContext.getOrCreate(dirName,() => createContext(dirName), hadoopConf) 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 
+0

hdfsのような信頼できるファイルシステムを使用して、エラーがないかどうかを確認してください。 – Knight71

+0

S3で試したが、それでも起こる。私は破損が先行ログで起こると信じています。 – thesamet

+0

あなたはどのバージョンのスパークを使用していますか? 1.6または2.0? –

答えて

0

を基本的に何をしようとすることは、あなたが実行しているクラスタに基づいて、これが機能するために、ドライバの障害のシナリオであるあなたが従わなければなりません以下の手順でドライバプロセスを監視し、失敗した場合はドライバを再起動します。

アプリケーションドライバの自動再起動の設定 - ドライバの障害から自動的に回復するには、ストリーミングアプリケーションを実行するために、 eドライバの処理を行い、失敗した場合はドライバを再起動します。 cluster managersには、これを達成するためのさまざまなツールがあります。

  1. スパークスタンドアロン - スパークアプリケーションドライバは(cluster deploy mode参照)スパークスタンドアロンクラスタ内 ランに提出することができる、つまり、アプリケーション・ドライバ自体は ワーカーノードのいずれかで実行されます。さらに、スタンドアロンクラスタマネージャは、ドライバを監視するように指示された にすることができ、ドライバ がゼロ以外の終了コードまたはドライバを実行している ノードの障害のために失敗した場合に再起動することができます。詳細については、クラスタモードを参照し、Spark スタンドアロンguideを参照してください。

  2. YARN - Yarnは、アプリケーションを自動的に再起動するための同様のメカニズムをサポートしています。 の詳細については、YARNのドキュメントを参照してください。

  3. Mesos-Marathonは、これをMesosで使用するために使用されています。

以下のように先読みログを設定する必要があります。あなたが従う必要があるS3の特別な手順があります。

先の書き込みのログをS3(またはフラッシングをサポートしていない任意のファイルシステム)を使用している間、

spark.streaming.driver.writeAheadLog.closeFileAfterWrite spark.streaming.receiver.writeAheadLogを有効にすることを忘れないでください。 closeFileAfterWrite。

詳細については、スパークStreaming Configurationを参照してください。

+0

私が説明する問題は、失敗後に手動で再起動した場合に発生します。 2〜3回目を再起動すると、その質問に記載されている例外が発生します。自動再起動は何か違いはありますか? – thesamet

+0

これらをtrueに設定しましたか? 。 –

+0

はい、S3でテストしてもこの問題が発生しました。ローカルファイルシステムに必要ですか? – thesamet

0

問題は、チェックポイントの破損よりもKryoシリアライザの問題と思われます。 コード例(GitHubプロジェクトを含む)では、Kryoシリアル化が構成されていません。 構成されていないため、KryoException例外が発生しない可能性があります。

"write ahead logs"を使用してディレクトリから復元すると、そこからすべてのSpark設定が取得されます。 例では、createContextメソッドは、チェックポイントから開始するとき呼び出されません。

別のアプリケーションは、以前に同じチェックポイントディレクトリ(Kryoシリアライザが設定されている場所)でテストされていると仮定します。 現在のアプリケーションをそのチェックポイントから復元できません。

関連する問題