2017-07-17 6 views
0

私は特定のページに属しているドキュメントのコレクションを持っています。私は各ドキュメントのTFIDFスコアを計算しましたが、私がしたいのは、ドキュメントに基づいて各ページのTFIDFスコアを平均化することです。グループごとのPySparkの平均TFIDF機能

希望の出力は、N(ページ)×M(ボキャブラリ)マトリックスです。 Spark/PySparkでこれをどうやって行うのですか?

from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer, StopWordsRemover 
from pyspark.ml import Pipeline 

tokenizer = Tokenizer(inputCol="message", outputCol="tokens") 
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered") 
countVec = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="features", binary=True) 
idf = IDF(inputCol=countVec.getOutputCol(), outputCol="idffeatures") 

pipeline = Pipeline(stages=[tokenizer, remover, countVec, idf]) 

model = pipeline.fit(sample_results) 
prediction = model.transform(sample_results) 

パイプラインからの出力は、以下の形式です。ドキュメントごとに1行

(466,[10,19,24,37,46,61,62,63,66,67,68,86,89,105,107,129,168,217,219,289,310,325,377,381,396,398,411,420,423],[1.6486586255873816,1.6486586255873816,1.8718021769015913,1.8718021769015913,2.159484249353372,2.159484249353372,2.159484249353372,2.159484249353372,2.159484249353372,2.159484249353372,2.159484249353372,2.159484249353372,2.159484249353372,2.159484249353372,2.159484249353372,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367,2.5649493574615367]) 

答えて

0

私は以下の答えを思いつきました。それは動作しますが、最も効率的です。私はそれをthis postに基づいています。

def as_matrix(vec): 
    data, indices = vec.values, vec.indices 
    shape = 1, vec.size 
    return csr_matrix((data, indices, np.array([0, vec.values.size])), shape) 

def as_array(m): 
    v = vstack(m).mean(axis=0) 
    return v 


mats = prediction.rdd.map(lambda x: (x['page_name'], as_matrix(x['idffeatures']))) 
final = mats.groupByKey().mapValues(as_array).cache() 

私は最終的に1つの86 x 10000 numpyのマトリックスにスタックします。すべてが動いていますが、ゆっくりとしています。

labels = [l[0] for l in final] 
tf_matrix = np.vstack([r[1] for r in final]) 
関連する問題