私は、多くのスパースベクトル(長さ250kの100kベクトル)の距離行列を計算する一般的な方法を構築しようとしています。私の例では、データはscipy csrマトリックスで表されています。これは私が何をやっているされていますpysparkスパースベクトルの距離行列を計算する
まず、私はSparseVectorsをpysparkするCSRの行を変換する方法を定義しています:
def csr_to_sparse_vector(row):
return SparseVector(row.shape[1], sorted(row.indices), row.data)
は今、私はベクトルに行を変換し、Iリストに保存しますその後、SparkContextにフィード:私は(この記事に類似:Pyspark calculate custom distance between all vectors in a RDD)すべてのペアを構築するためにデカルト関数を使用して、次のステップで
sparse_vectors = [csr_to_sparse_vector(row) for row in refs_sample]
rdd = sc.parallelize(sparse_vectors)
をで私はそれに応じて定義されているジャカード類似性TJE使用したい、この実験:
def jacc_sim(pair):
dot_product = pair[0].dot(pair[1])
try:
sim = dot_product/(pair[0].numNonzeros() + pair[1].numNonzeros())
except ZeroDivisionError:
return 0.0
return sim
今、私はちょうど機能をマップし、結果を収集する必要があります
distance_matrix = rdd2.map(lambda x: jacc_sim(x)).collect()
私は小さなサンプルでこのコードを実行していますよローカルマシンと180ノードを持つクラスタの両方で100のドキュメントしか使用できません。タスクは永遠に終わり、最後にクラッシュします:https://pastebin.com/UwLUXvUZ
何か問題があったと思われるものはありますか?
さらに、距離測定値が対称sim(x、y)== sim(y、x)の場合、行列の上三角形が必要です。
rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])
しかし、これはSparseVectorsのリストのために動作しません:私はフィルタリング(Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`)することによってこの問題を解決するポストを見つけました。
Hej @MisterJT、時間を割いていただきありがとうございます。クラッシュの原因となったスパークの設定に問題がありました。 – nadre
@nadre、あなたがそれを見つけてうれしい。スパークライブラリ特有の設定か、お使いのマシンに特有の設定ですか? – MisterJT