2017-04-04 5 views
1

私はconsumer-producerアプリケーションを作成しようとしています。spark pythonを使ってcassandraテーブルにデータを保存する方法は?

アプリケーションのプロデューサは、特定のトピックに関するデータを生成します。消費者は同じトピックからこのデータを消費し、スパークAPIを使用して処理し、このデータをcassandraテーブルに格納します。 101 = III | | 102 = 0.0771387731911 | 103 = -0.7076915761 100 = NO | 101 = AAA | 102 = 0.8961325446464 | 103 = -0.5465463154

100 = NO - 以下のような文字列の形式で来

着信データ

私は怒鳴るようにし、消費者を作成しました:

from kafka import KafkaConsumer 
from StringIO import StringIO 
import pandas as pd 
from cassandra.cluster import Cluster 

from pyspark import SparkConf, SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

def main(): 

    sc = SparkContext(appName="StreamingContext") 
    ssc = StreamingContext(sc, 3) 

    kafka_stream = KafkaUtils.createStream(ssc, "localhost:2181", "sample-kafka-app", {"NO-topic": 1}) 
    raw = kafka_stream.flatMap(lambda kafkaS: [kafkaS]) 
    clean = raw.map(lambda xs: xs[1].split("|")) 
    my_row = clean.map(lambda x: { 
     "pk": "uuid()", 
     "a": x[0], 
     "b": x[1], 
     "c": x[2], 
     "d": x[3], 
    }) 

    my_row.saveToCassandra("users", "data") 
    stream.start() 
    stream.awaitTermination() 

if __name__ == "__main__": 
    main() 

カサンドラテーブル構造 -

cqlsh:users> select * from data; 

pk | a | b | c | d 
----+---+---+---+--- 
CREATE TABLE users.data (
    pk uuid PRIMARY KEY, 
    a text, 
    b text, 
    c text, 
    d text 
) 

私は、エラーの下に直面している -

Traceback (most recent call last): 


File "consumer_no.py", line 84, in <module> 
    main() 
    File "consumer_no.py", line 53, in main 
    my_row.saveToCassandra("users", "data") 
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra' 
17/04/04 14:29:22 INFO SparkContext: Invoking stop() from shutdown hook 

私は上記の説明どの達成するために正しい方法でつもりされていますか?そうでない場合は、これを達成するための提案をしてください。そうであれば、上記のコードで何が間違っているか/欠けていますか?

+0

可能な複製を(http://stackoverflow.com/questions/35414677/saving-data- –

+0

「[sparkのsaveToCassandraを使用してcassandraテーブルにデータを保存する方法]の可能な複製」(http://stackoverflow.com/questions/43198661/how-to-save-data-in-カサンドラテーブル使用スパークス・サセトカサンドラ) – RussS

答えて

0

TransformedDStreamをCassandraに直接保存するのではなく、そのDStreamの各RDDをcassandraに保存する必要があります。

あなたはこのような何かあればあなたのコードは動作するはずです:[バックRDDとしてカサンドラにデータを保存]の

my_row.foreachRDD(lambda x: x.saveToCassandra("users", "data")) 
関連する問題