1
を含むログです:は、ここでJSON
"V2|ip-10-203-5-16|PSDK_KINESIS_LOG|PSDK_DEFAULT|2016-05-12 00:00:05|VP|aa2ddfdb-4387-49c0-9651-92cc83b8e905|{"vid":"2237031","fdn":"FDNB1995115","type":"0","version":"1.0","device_id":"SCL-TL00_a0-8d-16-e6-39-13_868145026279574","ip":"58.254.0.157","timestamp":1462982395}"
は私のコード:
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.{SparkContext, SparkConf}
import org.slf4j.LoggerFactory
class JsonLong{
@JsonProperty var fdn: String = null
@JsonProperty("type") var typ: String = null
@JsonProperty var vid: String = null
@JsonProperty var version: String = null
@JsonProperty var device_id: String = null
@JsonProperty var ip: String = null
@JsonProperty var timestamp: Long = 0L
override def toString = s"JsonLong(fdn=$fdn, typ=$typ, vid=$vid,version=$version, device_id=$device_id, ip=$ip, timestamp=$timestamp)"
}
def jsonString(args:String):JsonLong ={
val mapper = new ObjectMapper()
val record = mapper.readValue(args, classOf[JsonLong])
record
}
case class log(log_version: String,log_ip: String,log_from: String,SDK: String,action_time: Date,action: String,sn: String,post_code: JsonLong)
val df = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss")
val input = sc.textFile("input.snappy")
val RDD = input.map { line =>
val p = line.split("\\|")
val log_version = p(0)
val log_ip = p(1)
val log_from = p(2)
val SDK = p(3)
val action_time = df.parse(p(4))
val action = p(5)
val sn = p(6)
val post_code = if(p.length==8){
//to read the last JSON
jsonString(p(7))
} else("null")
log(log_version,log_ip,log_from,SDK,new Date(action_time.getTime()),action,sn,post_code)}.toDF()
私は最後のJSONに問題があります。私はすでにjsonStringをdefで返していますが、Objectを返します。最後のJsonに対処するには? post_code
を初期化するために使用
ありがとうございます!しかし、ここではまだcom.fasterxml.jackson.databind.JsonMappingExceptionを返すという問題があります。タイプ[simple type、class $ line45。$ read $$ iwC $$ iwC $ JsonLong]に適切なコンストラクタが見つかりません:JSONからインスタンス化できませんオブジェクト(型情報を追加/有効にする必要がありますか?) –
あなた自身がそれを理解しなければならないので、SOはデバッグサービスではありません... –
ありがとう。私はちょうどScalaを学び始めます~~私は修正するために最善を尽くしますそれ。 –