私はconsumer-producerアプリケーションを作成しようとしています。spark pythonを使ってcassandraテーブルにデータを保存する方法は?
アプリケーションのプロデューサは、特定のトピックに関するデータを生成します。消費者は同じトピックからこのデータを消費し、スパークAPIを使用して処理し、このデータをcassandraテーブルに格納します。 101 = III | | 102 = 0.0771387731911 | 103 = -0.7076915761 100 = NO | 101 = AAA | 102 = 0.8961325446464 | 103 = -0.5465463154
100 = NO - 以下のような文字列の形式で来
着信データ
私は怒鳴るようにし、消費者を作成しました:
from kafka import KafkaConsumer
from StringIO import StringIO
import pandas as pd
from cassandra.cluster import Cluster
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main():
sc = SparkContext(appName="StreamingContext")
ssc = StreamingContext(sc, 3)
kafka_stream = KafkaUtils.createStream(ssc, "localhost:2181", "sample-kafka-app", {"NO-topic": 1})
raw = kafka_stream.flatMap(lambda kafkaS: [kafkaS])
clean = raw.map(lambda xs: xs[1].split("|"))
my_row = clean.map(lambda x: {
"pk": "uuid()",
"a": x[0],
"b": x[1],
"c": x[2],
"d": x[3],
})
my_row.saveToCassandra("users", "data")
stream.start()
stream.awaitTermination()
if __name__ == "__main__":
main()
カサンドラテーブル構造 -
cqlsh:users> select * from data;
pk | a | b | c | d
----+---+---+---+---
CREATE TABLE users.data (
pk uuid PRIMARY KEY,
a text,
b text,
c text,
d text
)
私は、エラーの下に直面している -
Traceback (most recent call last):
File "consumer_no.py", line 84, in <module>
main()
File "consumer_no.py", line 53, in main
my_row.saveToCassandra("users", "data")
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra'
17/04/04 14:29:22 INFO SparkContext: Invoking stop() from shutdown hook
私は上記の説明どの達成するために正しい方法でつもりされていますか?そうでない場合は、これを達成するための提案をしてください。そうであれば、上記のコードで何が間違っているか/欠けていますか?
可能な複製を(http://stackoverflow.com/questions/35414677/saving-data- –
「[sparkのsaveToCassandraを使用してcassandraテーブルにデータを保存する方法]の可能な複製」(http://stackoverflow.com/questions/43198661/how-to-save-data-in-カサンドラテーブル使用スパークス・サセトカサンドラ) – RussS