stream.foreachRDD((rdd, time) => {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val jsonString = rdd.map { case (k, v) => AvroUtil.parseAVROToString(v) }
try {
val result = jsonString.mapPartitions(records => {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Article]))
} catch {
case e: Exception => None
}
})
}, true)
val df1 = result.toDF()
df1.show()
df1.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).json("jsonresults")
//df1.save("org.apache.phoenix.spark", org.apache.spark.sql.SaveMode.Overwrite, Map("table" -> "articles".toUpperCase,"zkUrl" -> zkQuorum+":\hbase-unsecure"))
} catch {
case e: Exception => None;
}
}
[この質問を参照](http://stackoverflow.com/questions/5250324/byte-array-to-string-and-back-issues-with-127)。エンコーディングを見てください。 – ale64bit
foreachの代わりにマップを使用すると便利ですか? val res = stream.map {record => parseAVROToString(record)}そして、res – drstein
の印刷はこのようには機能しません! –