2
sparkとmqttの新機能です。私はオンラインwordcount.pymqttとpysparkストリーミングの使用
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
exit(-1)
sc = SparkContext(appName="PythonStreamingMQTTWordCount")
ssc = StreamingContext(sc, 1)
brokerUrl = sys.argv[1]
topic = sys.argv[2]
lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
名付けてしまったと私は火花ストリーミングMQTT-assembly_2をダウンロードし、インストールしmosquittoブローカー(それが働いている)への指示に従っMQTTUtilsを使用してコードをしようとしています。 11-1.6.2.jarと、このコマンドを使用してPythonスクリプトを実行します。
〜$の_ * --jars火花ストリーミングMQTTアセンブリを、火花提出ジャーwordcount.pyをが、エラーが表示:。
from pyspark.streaming.mqttインポートMQTTUtils
ImportError:mqttという名前のモジュールがありません
私はここから何かを逃しましたか? ありがとうございます
[MCVE]を作成する方法。また、Spark 2.0+はMQTTバックエンドを提供しません。 Sparkパッケージに移行しました。 – zero323
私は同じ問題を抱えていましたが、私はversión2.0で作業していましたが、私はversión1.6.2で作業していて、スクリプトは動いています。 –