2017-08-08 26 views
1

私は、多くのスパースベクトル(長さ250kの100kベクトル)の距離行列を計算する一般的な方法を構築しようとしています。私の例では、データはscipy csrマトリックスで表されています。これは私が何をやっているされていますpysparkスパースベクトルの距離行列を計算する

まず、私はSparseVectorsをpysparkするCSRの行を変換する方法を定義しています:

def csr_to_sparse_vector(row): 
    return SparseVector(row.shape[1], sorted(row.indices), row.data) 

は今、私はベクトルに行を変換し、Iリストに保存しますその後、SparkContextにフィード:私は(この記事に類似:Pyspark calculate custom distance between all vectors in a RDD)すべてのペアを構築するためにデカルト関数を使用して、次のステップで

sparse_vectors = [csr_to_sparse_vector(row) for row in refs_sample] 
rdd = sc.parallelize(sparse_vectors) 

をで私はそれに応じて定義されているジャカード類似性TJE使用したい、この実験:

def jacc_sim(pair): 
    dot_product = pair[0].dot(pair[1]) 
    try: 
     sim = dot_product/(pair[0].numNonzeros() + pair[1].numNonzeros()) 
    except ZeroDivisionError: 
     return 0.0 
    return sim 

今、私はちょうど機能をマップし、結果を収集する必要があります

distance_matrix = rdd2.map(lambda x: jacc_sim(x)).collect() 

私は小さなサンプルでこのコードを実行していますよローカルマシンと180ノードを持つクラスタの両方で100のドキュメントしか使用できません。タスクは永遠に終わり、最後にクラッシュします:https://pastebin.com/UwLUXvUZ

何か問題があったと思われるものはありますか?

さらに、距離測定値が対称sim(x、y)== sim(y、x)の場合、行列の上三角形が必要です。

rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1]) 

しかし、これはSparseVectorsのリストのために動作しません:私はフィルタリング(Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`)することによってこの問題を解決するポストを見つけました。

答えて

1

問題は、データを1000個のパーティションに分割する構成エラーでした。解決策は、単に彼が作成する必要が明示的にどのように多くのパーティションスパーク伝えることでした(例10):

rdd = sc.parallelize(sparse_vectors, 10) 
また

私が列挙してスパースベクトルのリストを拡張し、私はその後、一部ではないのペアを除外することができ、このよう上三角行列の:

属する類似の機能を次のようになります。

def jacc_sim(pair): 
    id_0 = pair[0][0] 
    vec_0 = pair[0][1] 
    id_1 = pair[1][0] 
    vec_1 = pair[1][1] 
    dot_product = vec_0.dot(vec_1) 
    try: 
     sim = dot_product/(vec_0.numNonzeros() + vec_1.numNonzeros()) 
     if sim > 0: 
      return (id_0, id_1, sim) 
    except ZeroDivisionError: 
     pass 
    return None 

これは私のために非常によく働いたと私は他の誰かウィルを願っています私はそれも便利だとわかる!

0

リストに問題がありますか、またはスパースベクトルがリストを構成していますか? 1つの考えはSparseVectorsをDenseVectorsに変換しようとすることです。私がここで見つけた提案(Convert Sparse Vector to Dense Vector in Pyspark)です。計算結果も変わりません。スパークがそれをどのように処理するかだけです。

+0

Hej @MisterJT、時間を割いていただきありがとうございます。クラッシュの原因となったスパークの設定に問題がありました。 – nadre

+0

@nadre、あなたがそれを見つけてうれしい。スパークライブラリ特有の設定か、お使いのマシンに特有の設定ですか? – MisterJT