2017-06-15 10 views
0

私はScalaでSpark Streamingを使用しています。私はkafkaからjsonレコードを取得しています。私は値(日付と品質)とプロセスを得ることができるようにそれを解析したいと思います。私はこの依存関係を追加しましたScalaカフカからのjsonレコードの解析

case class diskQuality(datetime: String , quality : Double) extends Serializable 

def msgParse(value: String): diskQuality = { 

    import org.json4s._ 
    import org.json4s.native.JsonMethods._ 

    implicit val formats = DefaultFormats 

    val res = parse(value).extract[diskQuality] 
    return res 

} 

libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4" 

記録

stream.foreachRDD(rdd => { 
    rdd.collect().foreach(i => 
    println(msgParse(i.value()).quality) 
) 
}) 

そして私は、このケースクラスと私の解析機能を持っています。ここでは

は私のコードです受信しているフォーマットは次のとおりです。

"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}" 

は、しかし、私はこのエラーを取得する:

Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}" 

EDIT:

私はそれが動作する同じ機能を使用して、以下の解析を試み。私はscalaVersionを使用してい

val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}" 

:しかし、カフカのメッセージが同じ形式で来ていても、それはまだ同じエラーを与えるすべてのヘルプ=「2.10.6」とjson4s-native_2.10"

本当にいただければ幸い

+0

最初の書式は "{\" datetime \ ":\" 14-05-2017 14:18:30 \ "、\" quality \ ":92.6}」です。そしてあなたのコードはそれと共に動作します。 build.sbtのScala版が何であるか確認してください。 org.json4sの依存関係として2.10ですか?また、実際の値を確認するために、msgParse関数のvalueパラメータを記録することもできます。 –

+0

ご回答いただきありがとうございます。質問を編集しました。msgParse内の値は次のとおりです。 "{\" datetime \ ":\" 24-04-2017 07:53:30 \ "、\" quality \ ":100.0}" – AsmaaM

+0

@AsmaaMこれがあなたのコンソール出力であれば - 引用符がエスケープする問題があります。プロデューサーがkafkaに送るものを確認できますか? – ledniov

答えて

1

はあなたのカフカプロデューサー側の問題を持っているように、あなたはエスケープ引用符を置き換えることにより、以下のフォーマットで終わるしなければならないルックスあなたの時間をありがとう:。

{"datetime":"14-05-2017 14:18:30","quality":92.6}

正しい形式のJSON文字列を提供します。

+0

すべてが今動作します!ありがとうございました – AsmaaM