2016-06-01 2 views
0

これは動作するコードの例です:スパークスカラ:タスク直列化可能ではない

case class log(log_version: String,log_ip: String,log_from: String,SDK: String,action_time: String,action: String,sn: String,fdn: String,typ: String,vid: String,version: String,device_id: String,ip: String,timestamp: String) extends serializable 



val RDD = input.map{ line => 
    val p = line.split("\\|") 
    val log_version = p(0) 
    val log_ip = p(1) 
    val log_from = p(2) 
    val SDK = p(3) 
    val action_time = p(4) 
    val action = p(5) 
    val sn = p(6) 
    val JsonMap = if(p.length==8){ 
    val jsontest = parse(p(7), useBigDecimalForDouble = true) 
    jsontest.extract[Map[String,String]] 
    } else(Map("error" -> "empty")) 
    val fdn:String = JsonMap.get("fdn").getOrElse("null") 
    val typ:String = JsonMap.get("type").getOrElse("null") 
    val vid:String = JsonMap.get("vid").getOrElse("null") 
    val version:String = JsonMap.get("version").getOrElse("null") 
    val device_id:String = JsonMap.get("device_id").getOrElse("null") 
    val ip:String = JsonMap.get("ip").getOrElse("null") 
    val timestamp:String = JsonMap.get("timestamp").getOrElse("null") 
    log(log_version,log_ip,log_from,SDK,action_time,action,sn,fdn,typ,vid,version,device_id,ip,timestamp)}.toDF() 

私はSCにアクセスしようとするたびに、私は次のエラーを取得しています。私はここで間違って何をしていますか?

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 

私はこのような私のコードを変更:

case class JsonLong(fdn:String,typ:String,vid:String,version:String,device_id:String,ip:String,timestamp:String) 


case class log(log_version: String,log_ip: String,log_from: String,SDK: String,action_time: String,action: String,sn: String,JsonClass:JsonLong) extends serializable 


val RDD = input.map{ line => 
    val p = line.split("\\|") 
    val log_version = p(0) 
    val log_ip = p(1) 
    val log_from = p(2) 
    val SDK = p(3) 
    val action_time = p(4) 
    val action = p(5) 
    val sn = p(6) 
    val JsonMap:JsonLong = if(p.length==8){ 
    val jsontest = parse(p(7), useBigDecimalForDouble = true) 
    val x = jsontest.extract[Map[String,String]] 
    JsonLong(x.get("fdn").getOrElse("NULL"),x.get("typ").getOrElse("NULL"),x.get("vid").getOrElse("NULL"),x.get("version").getOrElse("NULL"),x.get("fdn").getOrElse("NULL"),x.get("ip").getOrElse("NULL"),x.get("timestamp").getOrElse("NULL")) 
    } else(null) 
    log(log_version,log_ip,log_from,SDK,action_time,action,sn,JsonMap)}.toDF() 

しかし、私はまだ誤解?どうして?私はポイントを得ることはありません~~~誰かが私に教えることができますか?

答えて

2

Sparkは、各実行者に送信するためにクロージャをシリアル化できる必要があります。あなたのコードでどのようにシリアル化できないのかを推測すると、Map[String, String]を抽出するのに、implicit Formatsが必要なjson4sを使用しているようです。マップ関数内で暗黙的に宣言してみてください。

+0

私のコードを変更しましたが、まだ間違っています~~ –

+0

'JsonLong'と' log'を 'Serializable'で拡張してみてください。 scalaでは、大文字でクラスに命名するので、 'log'を' Log'に改名することを検討してください。 –

+0

あなたの '暗黙的なvalのフォーマット= DefaultFormats'(またはあなたがケースクラスの直列化のために使っているもの)はどこにありますか?これは 'map'関数の中になければなりません。それは私の答えでの意味です。 –

関連する問題