0

ApacheSpark Structured StreamをMQTTトピック(この場合はIBM BluemixのIBM Watson IoTプラットフォーム)に接続しようとしています。ApacheBahirのスキーマに関する問題ApacheSparkストリーミングのStuctured Streamingコネクタ

次のように私は構造化されたストリームを作成しています:、これまでのところは良い

val df = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("username","<username>") 
    .option("password","<password>") 
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf") 
    .option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json") 
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883") 

を次のようにREPLで、私はこのDFオブジェクト取り戻す:

df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]

しかし、私は読むことを開始した場合私は次のエラーを取得する

val query = df.writeStream 
    .outputMode("append") 
    .format("console") 
    .start() 

このラインを使用してストリームから

scala> 17/02/03 07:32:23 ERROR StreamExecution: Query query-1 terminated with error java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.runtime.Nothing$ at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:156) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:156) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:156) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:155) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:155) at scala.collection.immutable.Range.foreach(Range.scala:160) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:155) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:332) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:329) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:194) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:120) 17/02/03 07:32:24 WARN MQTTTextStreamSource: Connection to mqtt server lost. Connection lost (32109) - java.io.EOFException at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:267) at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65) at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107) ... 1 more 17/02/03 07:32:28 WARN MQTTTextStreamSource: Connection to mqtt server lost.

私の直感では、スキーマに問題があると言うので、私は1を追加しました:

import org.apache.spark.sql.types._ val 
schema = StructType(
    StructField("count",LongType,true):: 
    StructField("flowrate",LongType,true):: 
    StructField("fluidlevel",StringType,true):: 
    StructField("frequency",LongType,true):: 
    StructField("hardness",LongType,true):: 
    StructField("speed",LongType,true):: 
    StructField("temperature",LongType,true):: 
    StructField("ts",LongType,true):: 
    StructField("voltage",LongType,true):: Nil) 

val df = spark.readStream 
    .schema(schema) 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("username","<username>") 
    .option("password","<password>") 
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf") 
    .option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json") 
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883") 

しかし、これは、任意のアイデアを助けていないのですか?

+0

これはバージョン管理の問題のようです。どのバージョンのMQTTとSparkを使用していますか? –

+0

spark-2.0.0-bin-hadoop2.7、Watson IoTはMQTT V3.1.1を使用していますimho –

+0

あなたのユーザー名とパスワードを投稿したようですが、誰でもすぐに使用できるようにこれらの資格情報をすぐに取り消してください。 – hardillb

答えて

1

TCP接続を閉じる
以降の接続のためにあなたが同じクライアントIDを再利用しているので、あなたの問題があるようです:クライアントID = ":vy0z2s:-vy0z2s-XXXXXXXXXX" プロトコル= mqtt4-TCPエンドポイント= "MQTT "RC = 288 Reason ="クライアントIDが再利用されました。 "

クライアントIDごとに1つの一意の接続しか許可されません。同じIDを使用して2つの同時接続を行うことはできません。

クライアントIDを確認し、同じアプリの複数のインスタンスが一意のクライアントIDを使用していることを確認してください。アプリケーションは同じAPIキーを共有できますが、MQTTではクライアントIDが常に一意である必要があります。