2016-01-26 12 views
5

私は、次のスパーク仕事を持っている:CassandraからSparkへのストリーミングを有効にするには?

from __future__ import print_function 

import os 
import sys 
import time 
from random import random 
from operator import add 
from pyspark.streaming import StreamingContext 
from pyspark import SparkContext,SparkConf 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark.sql import SQLContext, Row 
from pyspark.streaming import StreamingContext 
from pyspark_cassandra import streaming,CassandraSparkContext 

if __name__ == "__main__": 

    conf = SparkConf().setAppName("PySpark Cassandra Test") 
    sc = CassandraSparkContext(conf=conf) 
    stream = StreamingContext(sc, 2) 

    rdd=sc.cassandraTable("keyspace2","users").collect() 
    #print rdd 
    stream.start() 
    stream.awaitTermination() 
    sc.stop() 

私はこれを実行すると、それは私に次エラー与える:

ERROR StreamingContext: Error starting the context, marking it as stopped 
java.lang.IllegalArgumentException: requirement failed: \ 
No output operations registered, so nothing to execute 

シェルスクリプトを私が実行します。

./bin/spark-submit --packages TargetHolding:pyspark-cassandra:0.2.4 example 
s/src/main/python/test/reading-cassandra.py 

スパーの比較カフカとストリーミングK、私は上記のコードから欠落しているこのライン持っている:私は実際にcreateStreamを使用してい

kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", {'topic':1}) 

をしかし、カサンドラのために、私はドキュメント上で、このようなものを見ることができません。スパークストリーミングとcassandraの間でストリーミングを開始するにはどうすればよいですか?

バージョン

Cassandra v2.1.12 
Spark v1.4.1 
Scala 2.10 
+0

CassandraからSparkにストリームしたいですか?私はそれが今サポートされているとは思わない。ストリーミングデータ*を* cassandraに保存することができます:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md – maasg

+0

うん、私はCASSANDRAからSPARKにストリームしたいです。私は私が書いたスクリプトにかなり近いと思っていました。おそらく、ストリームを使って操作を登録する必要がありました。それはおそらく "createStream"です。私は火花からキャサンドラに流れる方法を知っています。 – HackCode

+0

テーブル全体( 'cassandraTable(" keyspace2 "、" users ")')をすべての時間間隔でストリーミングしますか? – maasg

答えて

0

カサンドラテーブルのうち、DSTREAMを作成するには、入力としてカサンドラテーブルから作成RDDを提供ConstantInputDStreamを使用することができます。これにより、各DStream間隔でRDDが実現されます。

サイズが大きくなるテーブルやテーブルのサイズが大きくなると、ストリーミングジョブのパフォーマンスに悪影響を与えることに注意してください。

も参照してください。例:Reading from Cassandra using Spark Streaming

+0

答えに感謝しますが、私はpysparkでその実装を探してみましたが、何も見つかりませんでした。それはPythonでサポートされていますか? – HackCode

+0

@HackCode Python APIをチェックした後、Pythonバインディングのために 'ConstantInputDStream'が存在しないようです:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#module-pyspark 。ストリーム化 – maasg

+0

@HackCodeあなたはこれに対する解決策を見つけましたか? 'ConstantInputDStream'がPython APIに存在しない場合、PySpark StreamingはCassandraとどのように連携できますか? – user2361174

関連する問題