2016-08-18 14 views
0

Dstreamのrddからサンプルを取りたいと思います。 DSTREAMはsample()変換を持っていない、それは私がDSTREAMからサンプルを取り、その上に語数を適用するためにこれをしたRDDSのシーケンスであるとして:Spark Dstreamで簡単なランダムサンプリングを行う方法(spark 1.6.1を使用したpyspark)

from pyspark import SparkContext 
from pyspark import SparkConf 

# Optionally configure Spark Settings 
conf=SparkConf() 
conf.set("spark.executor.memory", "1g") 
conf.set("spark.cores.max", "2") 

conf.setAppName("SRS") 
sc = SparkContext('local[3]', conf=conf) 

from pyspark.streaming import StreamingContext 
streamContext = StreamingContext(sc,3) 
lines = streamContext.socketTextStream("localhost", 9000) 

def sampleWord(rdd): 
    return rdd.sample(false,0.5,10) 


lineSample = lines.foreachRDD(sampleWord) 
words = lineSample.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word , 1)) 
wordCount = pairs.reduceByKey(lambda x, y: x + y) 
wordCount.pprint(60) 


streamContext.start() 
streamContext.stop() 

このコードでは、開始スパークが、何も正確に起こりません。 rdd.sample()がこのように動作しない理由がわかりません。 foreachRDDを使用すると、ストリームの各rddにアクセスできるので、今度はrddに固有の変換を使用できると思います。

答えて

0

。また、コードに誤字があります。定義されていないグローバル名 '真':NameError:

def sampleWord(rdd): 
return rdd.sample(False,0.5,10) //False, not false 

lineSample = lines.transform(sampleWord) 
words = lineSample.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word , 1)) 
wordCount = pairs.reduceByKey(lambda x, y: x + y) 
wordCount.pprint(60) 
0

使用transformtransformの代わりforeachRDDを使用して

lineSample = lines.transform(sampleWord) 
+0

は再びsampleWord =>リターンrdd.sampleに誤り(偽、0.5,100) があります。そして何も起こらない。スパークがサンプルを計算しているかどうかわからない。 – YyAaSs

関連する問題