私は、次のスパーク仕事を持っている:CassandraからSparkへのストリーミングを有効にするには?
from __future__ import print_function
import os
import sys
import time
from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext, Row
from pyspark.streaming import StreamingContext
from pyspark_cassandra import streaming,CassandraSparkContext
if __name__ == "__main__":
conf = SparkConf().setAppName("PySpark Cassandra Test")
sc = CassandraSparkContext(conf=conf)
stream = StreamingContext(sc, 2)
rdd=sc.cassandraTable("keyspace2","users").collect()
#print rdd
stream.start()
stream.awaitTermination()
sc.stop()
私はこれを実行すると、それは私に次エラー与える:
ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: \
No output operations registered, so nothing to execute
をシェルスクリプトを私が実行します。
./bin/spark-submit --packages TargetHolding:pyspark-cassandra:0.2.4 example
s/src/main/python/test/reading-cassandra.py
スパーの比較カフカとストリーミングK、私は上記のコードから欠落しているこのライン持っている:私は実際にcreateStream
を使用してい
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", {'topic':1})
をしかし、カサンドラのために、私はドキュメント上で、このようなものを見ることができません。スパークストリーミングとcassandraの間でストリーミングを開始するにはどうすればよいですか?
バージョン:
Cassandra v2.1.12
Spark v1.4.1
Scala 2.10
CassandraからSparkにストリームしたいですか?私はそれが今サポートされているとは思わない。ストリーミングデータ*を* cassandraに保存することができます:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md – maasg
うん、私はCASSANDRAからSPARKにストリームしたいです。私は私が書いたスクリプトにかなり近いと思っていました。おそらく、ストリームを使って操作を登録する必要がありました。それはおそらく "createStream"です。私は火花からキャサンドラに流れる方法を知っています。 – HackCode
テーブル全体( 'cassandraTable(" keyspace2 "、" users ")')をすべての時間間隔でストリーミングしますか? – maasg