2016-10-19 12 views
1

私はDataBricks(Spark 2.0.1-db1(Scala 2.11))で作業しており、Spark Streaming関数を使用しようとしています。私は図書館使用しています:
を - スパーク-SQL-ストリーミングmqtt_2.11-2.1.0-SNAPSHOT.jarに(ここを参照してください:http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/を):Spark Streaming MQTT - データセットのスキーマを適用する

val lines = spark.readStream 
     .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
     .option("clientId", "sparkTest") 
     .option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx") 
     .option("topic", "/Name/data") 
     .option("localStorage", "dbfs:/models/mqttPersist") 
     .option("cleanSession", "true") 
     .load().as[(String, Timestamp)] 

次のコマンドを実行すると、私のデータセットを提供しますこのprintSchemaと

root 
|-- value : string (nullable : true) 
|-- timestamp : timestamp (nullable : true) 

そして、私は私のデータセットの「値」列にスキーマを適用したいと思います。私のjsonスキーマをfolowingとして見ることができます。私はMQTTとどのからJSONを解析する方法が表示されない、今の

root 
|-- value : struct (nullable : true) 
    |-- id : string (nullable = true) 
    |-- DateTime : timestamp (nullable = true) 
    |-- label : double (nullable = true) 
|-- timestamp : timestamp (nullable : true) 

root 
|-- id : string (nullable = true) 
|-- DateTime : timestamp (nullable = true) 
|-- label : double (nullable = true) 

はそのような何かを得るために、ストリームに直接私のJSONを解析することが可能です助けはとても素晴らしいだろう。

ありがとうございます。

答えて

0

私はこの同じ正確な問題を今日持っていました!私はjson4sとJacksonを使ってjsonを解析しました。私はストリーミングデータセット(ほとんどあなたが持っているものと同じ)取得していますどのように

val lines = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("topic", topic) 
    .option("brokerUrl",brokerUrl) 
    .load().as[(String,Timestamp)] 

を私はケースクラスを使用してスキーマを定義した:

JSON列を解析
case class DeviceData(devicename: String, time: Long, metric: String, value: Long, unit: String) 

使用してorg.json4s.jackson.JsonMethods.parse:

val ds = lines.map { 
    row => 
    implicit val format = DefaultFormats 
    parse(row._1).extract[DeviceData] 
} 

結果を出力:

val query = ds.writeStream 
    .format("console") 
    .option("truncate", false) 
    .start() 

結果:

+----------+-------------+-----------+-----+----+ 
|devicename|time   |metric  |value|unit| 
+----------+-------------+-----------+-----+----+ 
|dht11_4 |1486656575772|temperature|9 |C | 
|dht11_4 |1486656575772|humidity |36 |% | 
+----------+-------------+-----------+-----+----+ 

私はスパークスネイティブJSONの解析を使用して解決策を考え出すことができない種類の失望。代わりにジャクソンに頼る必要があります。ファイルをストリームとして読み込んでいる場合は、sparkのネイティブjson解析を使用できます。したがって:

val lines = spark.readStream 
    ..... 
    .json("./path/to/file").as[(String,Timestamp)] 

しかし、MQTTではこれを行うことはできません。

関連する問題