2016-07-14 7 views
0

出力をバイト配列形式で取得していますが、文字列に変換する関数はありますか?kafkaを通してストリーミングを開始するavroを送信する

stream.foreachRDD(rdd => { 
    rdd.foreach(record => parseAVROToString(record)) 
}) 
stream.print 
ssc.start() 

message- 
{ 
    "id":"1", 
    "url":"http://localhost.com", 
    "article":" xyz", 
    "timestamp": 1366150681 
} 

受信: [B 1ffc17a0

@私はそれを文字列に変換したいです。

+0

[この質問を参照](http://stackoverflow.com/questions/5250324/byte-array-to-string-and-back-issues-with-127)。エンコーディングを見てください。 – ale64bit

+0

foreachの代わりにマップを使用すると便利ですか? val res = stream.map {record => parseAVROToString(record)}そして、res – drstein

+0

の印刷はこのようには機能しません! –

答えて

0
stream.foreachRDD((rdd, time) => { 
       val sqlContext = new SQLContext(sc) 
       import sqlContext.implicits._ 

       val jsonString = rdd.map { case (k, v) => AvroUtil.parseAVROToString(v) } 
       try { 
       val result = jsonString.mapPartitions(records => { 
        val mapper = new ObjectMapper() 
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 
        mapper.registerModule(DefaultScalaModule) 
        records.flatMap(record => { 
        try { 
         Some(mapper.readValue(record, classOf[Article])) 
        } catch { 
         case e: Exception => None 
        } 
        }) 
       }, true) 

       val df1 = result.toDF() 
       df1.show() 
       df1.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).json("jsonresults") 
       //df1.save("org.apache.phoenix.spark", org.apache.spark.sql.SaveMode.Overwrite, Map("table" -> "articles".toUpperCase,"zkUrl" -> zkQuorum+":\hbase-unsecure")) 
       } catch { 
       case e: Exception => None; 
       } 
      } 
関連する問題