みんな!TypeError: 'PipelinedRDD'タイプのオブジェクトにlen()がありません
コードスパークでPythonのコードを実行しているとき、私は、このエラーログを満たし:
メインの.py
sc = SparkContext(appName="newsCluster")
sc.addPyFile("/home/warrior/gitDir/pysparkCode/clusterNews/wordSeg.py")
sc.addPyFile("/home/warrior/gitDir/pysparkCode/clusterNews/sparkClusterAlgorithm.py")
wordseg = wordSeg()
clustermanage = sparkClusterAlgorithm()
files = sc.wholeTextFiles("hdfs://warrior:9000/testData/skynews")
file_list = files.map(lambda item: item[1])
file_wc_dict_list = file_list.map(lambda file_content:wordseg.GetWordCountTable(file_content))
file_wc_dict_list.persist()
all_word_dict = wordseg.updateAllWordList(file_wc_dict_list)
wordSeg.py
def updateAllWordList(self, newWordCountDictList):
'''
description: input an new file then update the all word list
input:
newWordCountDict: new input string word count dict
output:
all_word_dict
'''
n = len(newWordCountDictList)
all_word_list = []
all_word_dict = {}
for i in range(0,n):
all_word_list = list(set(all_word_list + newWordCountDictList[i].keys()))
for i in range(0,len(all_word_list)):
all_word_dict[all_word_list[i]]=0
return all_word_dict
...... 。 .......
spark メインの.py 出力エラーログを-submit:
Traceback (most recent call last):
File "/home/warrior/gitDir/pysparkCode/clusterNews/__main__.py", line 31, in <module>
all_word_dict = wordseg.updateAllWordList(file_wc_dict_list)#file_wc_dict_list.map(lambda file_wc_dict:wordseg.updateAllWordList(file_wc_dict))
File "/home/warrior/gitDir/pysparkCode/clusterNews/wordSeg.py", line 54, in updateAllWordList
n = len(newWordCountDictList)
TypeError: object of type 'PipelinedRDD' has no len()
それを解決する方法を! ありがとうございます!
あなたのコードが何をしているのか、そして何をすべきかに関する情報を提供してください。 – Oli
私はいくつかのニューステキストを収集し、それらをクラスタリングしてk-meansを使用したいと思います。私はローカルコードでそれを認識していますが、私はそれをsparkで実行したいと思います。 –
今私は以下のコードでそれを解決することができます。 1行のコード "file_wc_dict_list_result = file_wc_dict_list.collect()"を追加するだけです。私は今なぜ? –