2017-01-16 7 views
1

MQTTソースを読み取るようにSpark Streamingを設定しようとしていますが、2番目のメッセージを受信すると例外が発生します。MQTTストラクチャード・ストリーミング

私は次のコードを持っている:

import java.sql.Timestamp 

import org.apache.bahir.sql.streaming.mqtt._ 
import org.apache.commons.io.FileUtils 
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession} 
import org.apache.spark.sql.types.StructType 

object App { 

    def main(args: Array[String]): Unit = { 

    val spark = SparkSession 
     .builder 
     .appName("StructuredNetworkWordCount") 
     .master("local[*]") 
     .getOrCreate() 

    import spark.implicits._ 

    // Read text from socket 
    val lines = spark.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider").option("topic", "measurement").option("username", "spark").option("password", "******").load("tcp://10.0.0.129:1883").as[(String, Timestamp)] 

    val query = lines.writeStream.format("console").start 

    query.awaitTermination() 

    } 
} 

をそして、私は2番目のメッセージ受信したときに、私は次の例外を守ってください。

17/01/16 16:18:33 ERROR StreamExecution: Query query-1 terminated with error 
java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212) 
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142) 
Exception in thread "stream execution thread for query-1" java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212) 
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142) 
org.apache.spark.sql.streaming.StreamingQueryException: Query query-1 terminated with exception: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:248) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142) 
Caused by: java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212) 
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208) 
    ... 1 more 

を誰もがこの問題を持っていましたか?

+0

使用しているSparkとMQTTのバージョンはどれですか? –

+0

@YuvalItzchakov Spark 2.0.2とMQTT 3.1 –

+0

MQTT 3.1はSpark 2.0.2をサポートしていますか? –

答えて

-1

spark-MQTT jarはあなたのスパークと同じバージョンですか?異なるバージョンがこれらの問題を引き起こす可能性があります。 Cloudera Expressのスパーク1.6を使用した場合も同様の問題がありました。同じバージョンにアップグレードした後、問題は解決しました

+0

素晴らしい!私がSpark 2.1.0を使用していて、MQTT jarがバージョン2.0.1用に作られていることが分かりました。 –