私はDataBricks(Spark 2.0.1-db1(Scala 2.11))で作業しており、Spark Streaming関数を使用しようとしています。私は図書館使用しています:
を - スパーク-SQL-ストリーミングmqtt_2.11-2.1.0-SNAPSHOT.jarに(ここを参照してください:http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/を):Spark Streaming MQTT - データセットのスキーマを適用する
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx")
.option("topic", "/Name/data")
.option("localStorage", "dbfs:/models/mqttPersist")
.option("cleanSession", "true")
.load().as[(String, Timestamp)]
次のコマンドを実行すると、私のデータセットを提供しますこのprintSchemaと
:
root
|-- value : string (nullable : true)
|-- timestamp : timestamp (nullable : true)
そして、私は私のデータセットの「値」列にスキーマを適用したいと思います。私のjsonスキーマをfolowingとして見ることができます。私はMQTTとどのからJSONを解析する方法が表示されない、今の
root
|-- value : struct (nullable : true)
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
|-- timestamp : timestamp (nullable : true)
:
root
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
はそのような何かを得るために、ストリームに直接私のJSONを解析することが可能です助けはとても素晴らしいだろう。
ありがとうございます。