私はmssttをストリームとしてApache Sparkで使用しようとしています。使用するlibはapache bahir spark-sql-streaming-mqttです。 このライブラリは、paho mqttライブラリを使用しています。spark-sql-streaming-mqtt悪質なユーザーまたはパスワード
次のように私はLIBを使用しています:
val spark = SparkSession
.builder
.appName("MQTTStreamWordCount")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("username", "user")
.option("password", "psw")
.option("brokerUrl", "tcp://ip:1883")
.option("topic", "/bikes")
.option("cleanSession", "true")
.load("tcp://ip:1883").as[(String, Timestamp)]
val query = lines.select("value").writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
を、私はこのエラーを取得:「不正のユーザー名やパスワードを」。
しかし、別のakka/scalaプロジェクトでは、同じユーザー/ pswを使用して同じブローカー上でpaho-mqtt libを使用しても問題ありません。
ので、私はこのエラー