Dstreamのrddからサンプルを取りたいと思います。 DSTREAMはsample()
変換を持っていない、それは私がDSTREAMからサンプルを取り、その上に語数を適用するためにこれをしたRDDSのシーケンスであるとして:Spark Dstreamで簡単なランダムサンプリングを行う方法(spark 1.6.1を使用したpyspark)
from pyspark import SparkContext
from pyspark import SparkConf
# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("SRS")
sc = SparkContext('local[3]', conf=conf)
from pyspark.streaming import StreamingContext
streamContext = StreamingContext(sc,3)
lines = streamContext.socketTextStream("localhost", 9000)
def sampleWord(rdd):
return rdd.sample(false,0.5,10)
lineSample = lines.foreachRDD(sampleWord)
words = lineSample.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word , 1))
wordCount = pairs.reduceByKey(lambda x, y: x + y)
wordCount.pprint(60)
streamContext.start()
streamContext.stop()
このコードでは、開始スパークが、何も正確に起こりません。 rdd.sample()
がこのように動作しない理由がわかりません。 foreachRDD
を使用すると、ストリームの各rddにアクセスできるので、今度はrddに固有の変換を使用できると思います。
は再びsampleWord =>リターンrdd.sampleに誤り(偽、0.5,100) があります。そして何も起こらない。スパークがサンプルを計算しているかどうかわからない。 – YyAaSs