2016-03-21 10 views
0

みんな!TypeError: 'PipelinedRDD'タイプのオブジェクトにlen()がありません

コードスパークでPythonのコードを実行しているとき、私は、このエラーログを満たし:

メインの.py

sc = SparkContext(appName="newsCluster") 
sc.addPyFile("/home/warrior/gitDir/pysparkCode/clusterNews/wordSeg.py") 
sc.addPyFile("/home/warrior/gitDir/pysparkCode/clusterNews/sparkClusterAlgorithm.py") 
wordseg = wordSeg() 
clustermanage = sparkClusterAlgorithm() 
files = sc.wholeTextFiles("hdfs://warrior:9000/testData/skynews") 
file_list = files.map(lambda item: item[1]) 
file_wc_dict_list = file_list.map(lambda file_content:wordseg.GetWordCountTable(file_content)) 
file_wc_dict_list.persist() 
all_word_dict = wordseg.updateAllWordList(file_wc_dict_list) 

wordSeg.py

def updateAllWordList(self, newWordCountDictList): 
    ''' 
     description: input an new file then update the all word list 
     input: 
      newWordCountDict: new input string word count dict 
     output: 
      all_word_dict 
    ''' 
    n = len(newWordCountDictList) 
    all_word_list = [] 
    all_word_dict = {} 
    for i in range(0,n): 
     all_word_list = list(set(all_word_list + newWordCountDictList[i].keys())) 
    for i in range(0,len(all_word_list)): 
     all_word_dict[all_word_list[i]]=0 
    return all_word_dict 

...... 。 .......

spark メインの.py 出力エラーログを-submit:

Traceback (most recent call last):            
File "/home/warrior/gitDir/pysparkCode/clusterNews/__main__.py", line 31, in <module> 
    all_word_dict =  wordseg.updateAllWordList(file_wc_dict_list)#file_wc_dict_list.map(lambda  file_wc_dict:wordseg.updateAllWordList(file_wc_dict)) 
File "/home/warrior/gitDir/pysparkCode/clusterNews/wordSeg.py", line 54, in updateAllWordList 
    n = len(newWordCountDictList) 
TypeError: object of type 'PipelinedRDD' has no len() 

それを解決する方法を! ありがとうございます!

+0

あなたのコードが何をしているのか、そして何をすべきかに関する情報を提供してください。 – Oli

+0

私はいくつかのニューステキストを収集し、それらをクラスタリングしてk-meansを使用したいと思います。私はローカルコードでそれを認識していますが、私はそれをsparkで実行したいと思います。 –

+0

今私は以下のコードでそれを解決することができます。 1行のコード "file_wc_dict_list_result = file_wc_dict_list.collect()"を追加するだけです。私は今なぜ? –

答えて

2

newWordCountDictListは、ドライバプログラムのローカルコレクションオブジェクトではなく、RDD(分散オブジェクトおよび複数の作業ノードにあります)オブジェクトです。

あなたは正しい結果を得るために

n = newWordCountDictList.count() 

または

all_word_dict = wordseg.updateAllWordList(file_wc_dict_list.collect()) 

のいずれかを使用することができます。

+0

ありがとうございます!できます!! –

+0

あなたが気にしないなら、上記の答えを受け入れてください:) –

関連する問題