kafkaからsparkストリーミングアプリケーションにcsvファイルを送信しようとしていますが、その方法はわかりません。私はここに多くの記事を読んだが、誰も私を助けなかった。CSVをKafkaからSpark Streamingに送信する
私のkafkaプロデューサがcsvを送信し、後でアプリケーション(コンシューマ)で分割したいと思いますが、それは重要ではありません。私はRDDを作成し、それをスパークするように送信しようとしました。 これは通常の文字列メッセージでは機能しましたが、csvには反映されませんでした。
これは私のプロデューサーである:
message =sc.textFile("/home/guest/host/Seeds.csv")
producer.send('test', message)
そして、私のスパーク消費者:
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {'test': 1}) data = kvs.map(lambda x: x[1])
counts = data.flatMap(lambda line: line.split(";")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
問題は、csvファイルを送信することにより、ドンをstreammingスパークということですイベントを受け取りません。 誰かがフォーマットや概念を教えてくれますか?
ドッキング・コンテナの下でpythonを使用しているノートブックでプロデューサとコンシューマを実行しています。
ありがとうございます。