2016-11-09 1 views
2

私は例外を思い付く:スパークカフカプロデューサー、シリアライズ

ERRORのyarn.ApplicationMaster:Userクラスの例外がスローされました: org.apache.spark.SparkException:タスク直列化可能ではない org.apache.spark.SparkException : でシリアル化できないタスクorg.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:889)at org.apache。 spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:888) org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:147) at org.apache.spark。 rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) org.apache.spark.rdd.RDD.foreach(RDD.scala :888) com.Boot $ .test(Boot.scala:60)com.Boot $ .main(Boot.scala:36) com.Boot.main(Boot.scala) sun.reflect。 NativeMethodAccessorImpl.invoke0(ネイティブメソッド) sun.reflect.DelegatingMethodAccessorImpl.invoke java.lang.reflect.Method.invokeで(DelegatingMethodAccessorImpl.java:43) (Method.java:606)でsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) org.apache.spark.deploy.yarn.ApplicationMaster $$ anon $ 2.run(ApplicationMaster.scala:525) 原因:java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducerシリアライゼーションスタック: - オブジェクトは直列化できません(クラス:org.apache.kafka.clients.producer.KafkaProducer、値: [email protected]) - フィールド(クラス:com.Boot $$ anonfun $ test $ 1、name:プロデューサ$ 1、タイプ:class org.apache.kafka.clients.producer.KafkaProducer) - org.apache.spark.serializer.SerializationDebugger $ .improveException(serializationDebugger.scala:40)のオブジェクト(クラスcom.Boot $$ anonfun $ test $ 1) org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala:47): org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:301)で org.apache.spark.serializer.JavaSerializerInstance.serialize(84 JavaSerializer.scala)で このプログラムで

// @transient 
val sparkConf = new SparkConf() 

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 

// @transient 
val sc = new SparkContext(sparkConf) 

val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*") 

// @transient 
val props = new HashMap[String, Object]() 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, NearLineConfig.kafka_brokers) 
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
props.put("producer.type", "async") 
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152") 

// @transient 
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props) 

requestSet.foreachPartition((partisions: Iterator[String]) => { 
    partisions.foreach((line: String) => { 
    try { 
     producer.send(new ProducerRecord[String, String]("testtopic", line)) 
    } catch { 
     case ex: Exception => { 
     log.warn(ex.getMessage, ex) 
     } 
    } 
    }) 
}) 

producer.close() 

私は、HDFSパスからレコードを読み、カフカにそれらを保存しよう。 問題は、レコードをkafkaに送信するコードを削除したときにうまく動作することです。 私が逃したものは?

答えて

2

KafkaProducerはシリアル化できません。

requestSet.foreachPartition((partisions: Iterator[String]) => { 
    val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props) 
    partisions.foreach((line: String) => { 
    try { 
     producer.send(new ProducerRecord[String, String]("testtopic", line)) 
    } catch { 
     case ex: Exception => { 
     log.warn(ex.getMessage, ex) 
     } 
    } 
    }) 
}) 

KafkaProducer.sendその戻りFuture[RecordMetadata]、およびキーまたは値をシリアライズすることができないならば、それはSerializationExceptionあるから伝播することができる唯一の例外:あなたはforeachPartition内側にインスタンスの作成を移動する必要があります。

+0

ありがとうございます。私は自分のコードを変更するあなたのやり方をしました。できます 。 –

+0

@ Steven.Prgm彼は何を言いましたか? –

+0

ところで、私の仲間のひとりがドライバー変数とエグゼキュータ変数について言及しましたが、その違いは何ですか? –