2017-02-04 5 views
1

スパークストリーミング受信機ベースのアプローチを使用していますが、データの消失問題を解決するためにチェックポインティングを有効にしました。スパークストリーミングチェックポインティングスローではシリアル化できない例外

スパークバージョンは1.6.1で、カフカのトピックからのメッセージを受け取ります。

私はsscを内部で使用しています。foreachRDDの方法はDStreamです。したがって、シリアル化できない例外がスローされます。

私はクラスSerializableを拡張しようとしましたが、それでも同じエラーです。私たちがチェックポイントを有効にしたときだけ起こっています。

コードは:

def main(args: Array[String]): Unit = { 

    val checkPointLocation = "/path/to/wal" 
    val ssc = StreamingContext.getOrCreate(checkPointLocation,() => createContext(checkPointLocation)) 
    ssc.start() 
    ssc.awaitTermination() 
    } 

    def createContext (checkPointLocation: String): StreamingContext ={ 

     val sparkConf = new SparkConf().setAppName("Test") 
     sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") 
     val ssc = new StreamingContext(sparkConf, Seconds(40)) 
     ssc.checkpoint(checkPointLocation) 
     val sc = ssc.sparkContext 
     val sqlContext: SQLContext = new HiveContext(sc) 
     val kafkaParams = Map("group.id" -> groupId, 
     CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> sasl, 
     ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", 
     ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", 
     "metadata.broker.list" -> brokerList, 
     "zookeeper.connect" -> zookeeperURL) 
     val dStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) 
     dStream.foreachRDD(rdd => 
     { 
      // using sparkContext/sqlContext to do any operation throws error. 
      // convert RDD[String] to RDD[Row] 
      //Create Schema for the RDD. 
      sqlContext.createDataFrame(rdd, schema) 
     }) 
     ssc 
    } 

エラーログ:

2017年2月8日22:53:53250 ERROR [ドライバー] streaming.StreamingContext:コンテキストを起動 エラー、マーキング停止しているとして java.io.NotSerializableException:DStreamチェックポイントは になっていますが、その機能を持つDStreamはシリアル化できません org.apache.spark.SparkCo ntextシリアル化スタック: - オブジェクトは直列化できません(クラス:org.apache.spark.SparkContext、値: [email protected]) - フィールド(クラス:com.x.payments.RemedyDriver $$ anonfun $ (クラス$ org.apache.spark.SparkContext) - オブジェクト(クラスcom.x.payments.RemedyDriver $$ anonfun $ main $ 1) - フィールド(クラス:org.apache.spark 。streaming.dstream.Dstream.Dstream.D $ $ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3、 名前:cleanedF $ 1、タイプ:インタフェースscala.Function1) - オブジェクト(クラスorg.apache.spark.streaming.dstream .DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3、 ) - writeObjectデータ(クラス:org.apache.spark.streaming.dstream.DStream) - オブジェクト(cl配列の要素(インデックス:0) - 配列(クラス[Ljava.lang.Object;]);[email protected]) 、size 16) - フィールド(クラス:scala.collection.mutable.ArrayBuffer、名前:配列、タイプ:クラス[Ljava.lang.Object]) - オブジェクト(クラスscala.collection.mutable.ArrayBuffer、ArrayBuffer(org。 (クラス[email protected])) - writeObjectデータ(クラス:org.apache.spark.streaming.dstream.DStreamCheckpointData) - オブジェクト(クラスorg.apache.spark.streaming.dstream.DStreamCheckpointData、 0 チェックポイントファイル

] - writeObject data(クラス:org.apache.spark.streaming.dstream.DStream) - オブジェクト(クラスorg.apache.spark.streaming.kafka.KafkaInputDStream、 [email protected] )配列の要素() - 配列(クラス[Ljava.lang.Object]、サイズ16) - フィールド(クラス:scala.collection.mutable.ArrayBuffer、名前:配列、型:クラス[Ljava .lang.Object;) - オブジェクト(クラススカラー。オブジェクト。 .DStreamGraph) - オブジェクト(クラスorg.apache.spark.streaming.DStreamGraph、org.apache.spark.streaming。フィールド(クラス:org.apache.spark.streaming.Checkpoint、name:グラフ、型:class org.apache.spark.streaming.DStreamGraph) - オブジェクト(クラスorg.apache.spark.streaming。チェックポイント、[email protected]) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:557) at org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext .scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at com.x.payments.RemedyDriver $ .main(RemedyDriver.scala:104) at com.x. payments.RemedyDriver.main(RemedyDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド) at sun.reflec t.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)で java.lang.reflect.Method.invoke(Method.java:498)で ORGで 。 apache.spark.deploy.yarn.ApplicationMaster $$ anon $ 2.run(ApplicationMaster.scala:559) 2017-02-08 22:53:53,250エラー[ドライバ] payments.RemedyDriver $:DStream チェックポイントが有効になっていますが、その機能とDStreams がシリアライズorg.apache.spark.SparkContextシリアライズ スタックされない: は - (クラス:org.apache.spark.SparkContext、値: [email protected])シリアライズしないオブジェクト を - フィールド(クラス:com.x.payments.RemedyDriver $$ a (クラスcom.payments.RemedyDriver $$ anonfun $ main $ 1) - フィールド(クラス:org.apache .spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3、 名前:cleanedF $ 1、タイプ:interface scala.Function1) - オブジェクト(クラスorg.apache.spark.streaming (クラスorg.apache.spark.streaming.dstream.DStream) - オブジェクト(クラスorg.apache())。 .spark.streaming.dstream.ForEachDStream、 [email protected]) - 配列の要素(インデックス:0) - 配列(クラス[Ljava.lang.Object; sizeオブジェクト(クラスscala.collection.mutable.ArrayBuffer、ArrayBuffer(org.apache。))。このクラスは、クラスのインスタンスを作成します。 (クラスorg.apache.spark.streaming.dstream.DefaultCheckpointData) - オブジェクト(クラスorg.apache.spark.streaming.dstream.DStreamCheckpointData、[0 チェックポイントファイル

]) - writeObjectからデータ(クラス:org.apache.spark.streaming.dstream.DStream) - オブジェクト(クラスorg.apache.spark.streaming.kafka.KafkaInputDStream、 org.apache.spark .streaming.kafka.KafkaInputDStream @ acd8e32 )配列の要素() - 配列(クラス[Ljava.lang.Object]、サイズ16) - フィールド(クラス:scala.collection.mutable.ArrayBuffer、名前:配列、型:クラス[Ljava .lang.Object;) - オブジェクト(クラススカラー。オブジェクト。 .DStreamGraph) - オブジェクト(クラスorg.apache.spark.streaming.Checkpoint、name:graph、org.apache.spark.streaming.DStreamGraph)タイプ:クラスorg.apache.spark.streaming.DStreamGraph) - オブジェクト(クラスorg.apache.spark.streaming.Checkpoint、org.apache.spark.streaming。484bf033 @チェックポイント)2017年2月8日 22:53:53255 INFO [ドライバ] yarn.ApplicationMaster:最終的なアプリのステータス: が成功し、終了コード:0

更新:基本的に

私たちrddをDF(foreachRDDメソッドのDStream内)に変換した後、その上にDF APIを適用し、最終的にCassandraにデータを格納します。そこでsqlContextを使用してrddをDFに変換し、その時にはエラーをスローします。あなたがSparkContextにアクセスしたい場合

+0

foreachRDDの内部で何が行われているかを表示できますか? atleast a sample –

+0

@Yuval Itzchakov:この問題で私を助けてくれますか? – Shankar

答えて

3

rdd値を経由して、そう:

dStream.foreachRDD(rdd => { 
    val sqlContext = new HiveContext(rdd.context) 
    val dataFrameSchema = sqlContext.createDataFrame(rdd, schema) 
} 

この:

dStream.foreachRDD(rdd => { 
    // using sparkContext/sqlContext to do any operation throws error. 
    val numRDD = sc.parallelize(1 to 10, 2) 
    log.info("NUM RDD COUNT:"+numRDD.count()) 
} 

「はそれができる、SparkContextが閉鎖にシリアル化する原因になっていますそれは直列化できないからです。

+0

私は自分の質問を更新しました。どうぞご覧ください。 – Shankar

+0

@ Shankar DataFramesを使用する場合は、Structured Streaming APIを使用しないのはなぜですか? –

+0

createStreamまたはcreateDirectStreamを使用できないということですか?これはチェックポインティングを有効にするときにのみ起こります..また、構造化ストリーミングはSPark 1.6.1で動作しますか? – Shankar

関連する問題