2016-09-06 10 views
2

sparkとmqttの新機能です。私はオンラインwordcount.pymqttとpysparkストリーミングの使用

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.mqtt import MQTTUtils 
if __name__ == "__main__": 
    if len(sys.argv) != 3: 
     print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>" 
     exit(-1) 

    sc = SparkContext(appName="PythonStreamingMQTTWordCount") 
    ssc = StreamingContext(sc, 1) 

    brokerUrl = sys.argv[1] 
    topic = sys.argv[2] 

    lines = MQTTUtils.createStream(ssc, brokerUrl, topic) 
    counts = lines.flatMap(lambda line: line.split(" ")) \ 
     .map(lambda word: (word, 1)) \ 
     .reduceByKey(lambda a, b: a+b) 
    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 

名付けてしまったと私は火花ストリーミングMQTT-assembly_2をダウンロードし、インストールしmosquittoブローカー(それが働いている)への指示に従っMQTTUtilsを使用してコードをしようとしています。 11-1.6.2.jarと、このコマンドを使用してPythonスクリプトを実行します。

〜$の_ * --jars火花ストリーミングMQTTアセンブリを、火花提出ジャーwordcount.pyをが、エラーが表示:。

from pyspark.streaming.mqttインポートMQTTUtils

ImportError:mqttという名前のモジュールがありません

私はここから何かを逃しましたか? ありがとうございます

+2

[MCVE]を作成する方法。また、Spark 2.0+はMQTTバックエンドを提供しません。 Sparkパッケージに移行しました。 – zero323

+0

私は同じ問題を抱えていましたが、私はversión2.0で作業していましたが、私はversión1.6.2で作業していて、スクリプトは動いています。 –

答えて

3

スパークバージョン2の場合:*私たちはStructured StreamingでMQTTをBahir Jarを使用して使用できます。

pysparkからMQTTブローカーに接続します。

​​
関連する問題