2017-05-16 12 views
0

こんにちは、私はPysparkストリーミングが初めてです。Pysparkストリーミング変換エラー

numbers0 = sc.parallelize([1,2,3,4,5]) 

numbers1 = sc.parallelize([2,3,4,5,6]) 

numbers2 = sc.parallelize([3,4,5,6,7]) 

stream0 = ssc.queueStream([numbers0, numbers1, numbers2]) 

stream0.pprint() 

ssc.start() 
ssc.awaitTermination(20) 
ssc.stop() 

これは、すぐに私はエラーを取得する次の手順を実行して正常に動作しますが、私が欲しいもの

stream1 = stream0.transform(lambda x: x.mean()) 

stream1.pprint() 

ssc.start() 
ssc.awaitTermination(20) 
ssc.stop() 

は私の前のストリームの平均値で構成されていることをストリームです。 誰かが私が何をしなければならないか知っていますか?

+0

ようforEachRDD出力動作の内部でこれを行うと、エラーは何ですか? –

+0

このように変換を使用すると、各RDDの手段である3つのエントリが得られます。 –

答えて

0

変換を呼び出すときに表示されるエラーは、Spark's documentation for the transform operationに記載されているRDD-RDD機能が必要なためです。 meanがRDD上で呼び出されると、それは新しいRDDを返さず、したがってエラーを返します。

ここから、DStreamで構成される各RDDの平均を計算したいと思っています。 DStreamはqueueStreamで作成され、指定されたパラメータoneAtATimeはデフォルトのままなので、プログラムはすべてのバッチ間隔で1つのRDDを消費します。各RDDの平均を計算するには

、あなたは通常、この

# Create stream0 as you do in your example 

def calculate_mean(rdd): 
    mean_value = rdd.mean() 
    # do other stuff with mean_value like saving it to a database or just print it 

stream0.forEachRDD(calculate_mean) 

# Start and stop the Streaming Context 
関連する問題