2017-05-01 9 views
0

kafkaからsparkストリーミングアプリケーションにcsvファイルを送信しようとしていますが、その方法はわかりません。私はここに多くの記事を読んだが、誰も私を助けなかった。CSVをKafkaからSpark Streamingに送信する

私のkafkaプロデューサがcsvを送信し、後でアプリケーション(コンシューマ)で分割したいと思いますが、それは重要ではありません。私はRDDを作成し、それをスパークするように送信しようとしました。 これは通常の文字列メッセージでは機能しましたが、csvには反映されませんでした。

これは私のプロデューサーである:

message =sc.textFile("/home/guest/host/Seeds.csv")  
producer.send('test', message) 

そして、私のスパーク消費者:

ssc = StreamingContext(sc, 5) 

kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {'test': 1}) data = kvs.map(lambda x: x[1]) counts = data.flatMap(lambda line: line.split(";")) \

.map(lambda word: (word, 1)) \ 
.reduceByKey(lambda a, b: a+b) 

問題は、csvファイルを送信することにより、ドンをstreammingスパークということですイベントを受け取りません。 誰かがフォーマットや概念を教えてくれますか?

ドッキング・コンテナの下でpythonを使用しているノートブックでプロデューサとコンシューマを実行しています。

ありがとうございます。

答えて

0

あなたのプロデューサーでは、メッセージは遅れて評価されるRDD(csvファイルラインの集合体です)です。つまり、アクションを実行するまで何もしません。ですから、カフカに送る前にRDDを収集する必要があります。 以下のリンクをご覧ください。 how to properly use pyspark to send data to kafka broker?

関連する問題