2013-04-07 18 views
16

コードを最適化しようとすると、最後のリソースは複数のコアを使って以下のコードを実行しようとするようです。私はコードを変換/再構成する方法を正確には知らないので、複数のコアを使って実行するほうがはるかに高速です。最終目標を達成するための指導を受けることができれば幸いです。最終的な目標は、各配列が約70万の要素を保持する配列AとBに対して、このコードをできるだけ高速に実行できることです。ここでは小さな配列を使用したコードです。 700k要素配列はコメントアウトされています。MATLABの "ismember"関数に相当するPython

import numpy as np 

def ismember(a,b): 
    for i in a: 
     index = np.where(b==i)[0] 
     if index.size == 0: 
      yield 0 
     else: 
      yield index 


def f(A, gen_obj): 
    my_array = np.arange(len(A)) 
    for i in my_array: 
     my_array[i] = gen_obj.next() 
    return my_array 


#A = np.arange(700000) 
#B = np.arange(700000) 
A = np.array([3,4,4,3,6]) 
B = np.array([2,5,2,6,3]) 

gen_obj = ismember(A,B) 

f(A, gen_obj) 

print 'done' 
# if we print f(A, gen_obj) the output will be: [4 0 0 4 3] 
# notice that the output array needs to be kept the same size as array A. 

私は何をしようとしていると、MATLAB関数のようにフォーマットされ[2](1 ismemberと呼ばれる模倣することである:[Lia,Locb] = ismember(A,B)私はちょうどLocb一部のみを取得しようとしています

。 MATLABから

:AはBのメンバー主の

ものではないどこLocbは、出力アレイ、Locbは、0が含まれているBのメンバーである各値に対してBの最下位インデックスを含みます問題は私がablになる必要があるということですeこの操作を可能な限り効率的に実行する。テストのために、私は700k要素の2つの配列を持っています。ジェネレータを作成してジェネレータの値を調べても、そのジョブを速く完了させることはできません。

答えて

13

複数のコアを心配する前に、私は辞書を使用して、内容isMember関数の線形スキャンを排除するであろう:

def ismember(a, b): 
    bind = {} 
    for i, elt in enumerate(b): 
     if elt not in bind: 
      bind[elt] = i 
    return [bind.get(itm, None) for itm in a] # None can be replaced by any other "not in b" value 

あなたのオリジナルの実装は、Aの各要素に対してBの要素の完全スキャンを必要とし、それをO(len(A)*len(B))とする。上記のコードは、dict Bsetを生成するためにBのフルスキャンを1回必要とします。 dictを使うことによって、Aの各要素に対してB定数の各要素を効率的に検索し、操作をO(len(A)+len(B))にします。それでも遅すぎる場合は、上記の機能を複数のコアで実行することについて心配してください。

編集:インデックスを少し修正しました。その配列の全ては、Python/0で配列を開始するので、あなたがしている場合は、データ・セットは、この

A = [2378, 2378, 2378, 2378] 
B = [2378, 2379] 

のように見え、あなたがいない要素のために0を返し、その後、あなたの結果がするnumpyのインデックス1から始まりますので、MATLABは、0を使用しています上記のルーチンは、0ではなくインデックスがない場合はNoneを返します。-1を返すと、Pythonはそれを配列の最後の要素と解釈します。 Noneは配列へのインデックスとして使用されている場合に例外を送出します。異なる動作をしたい場合は、Bind.get(item,None)式の2番目の引数を返す値に変更します。

+0

これは本当に速いです!あなたは私があなたの解決策にどのくらい感謝しているか分かりません。どうもありがとうございました !特定のツールを使用してパフォーマンスプロファイルを出力しますか? – zd5151

+5

@ z5151いいえ、それは簡単なアルゴリズム分析です。 [Big-O表記法を使う](http://en.wikipedia.org/wiki/Big_O_notation): 'np.where'は、' O(len(B)) 'を必要とする' B'の線形スキャンを実行しなければなりません。オペレーション。 'O(len(A))'操作を必要とする外部ループを使用して、元のアルゴリズムをおおよそ 'O(len(A)* len(B))'操作にします。 'Bind'を生成するには' len(B) '操作が必要です。辞書は[ハッシュテーブル](http://en.wikipedia.org/wiki/Hash_table)として実装されており、一定の 'O(1)'ルックアップを持ち、スキャンAは 'O(len(A))'です。全体の複雑さは 'O(len(A)+ len(B))'です。 – sfstewman

+0

入手しました。ウィキペディアの参考に感謝します。 – zd5151

1

リスト内包表記を使用してみてください。

In [1]: import numpy as np 

In [2]: A = np.array([3,4,4,3,6]) 

In [3]: B = np.array([2,5,2,6,3]) 

In [4]: [x for x in A if not x in B] 
Out[4]: [4, 4] 

一般に、リストの補完はforループよりもはるかに高速です。

等しい長さのリストを取得するには、

In [19]: map(lambda x: x if x not in B else False, A) 
Out[19]: [False, 4, 4, False, False] 

これは、小さなデータセットのために非常に高速です:

In [20]: C = np.arange(10000) 

In [21]: D = np.arange(15000, 25000) 

In [22]: %timeit map(lambda x: x if x not in D else False, C) 
1 loops, best of 3: 756 ms per loop 

大規模なデータセットの場合は、あなたが操作をスピードアップするためにmultiprocessing.Pool.map()を使用して試みることができます。

+0

出力配列が同じサイズを維持することが必要です。 – zd5151

+0

@ z5151:詳細な回答をご覧ください。必要ならば 'lambda'式をFalseの代わりに0を返すように変更することができますが、それは結果の中の実数の0をマスクします。 –

+0

これは、要素数の少ない配列に便利です。リスト内包表記がループよりもはるかに高速であることを強調していただきありがとうございます。 – zd5151

10

sfstewmanの優れた答えがあなたの問題を解決する可能性が最も高いです。

numpyで同じことをどのように達成できるかを追加したいと思います。

numpyのuniquein1dの機能を利用しています。

B_unique_sorted, B_idx = np.unique(B, return_index=True) 
B_in_A_bool = np.in1d(B_unique_sorted, A, assume_unique=True) 
  • B_unique_sortedソートBで一意の値が含まれています。
  • B_idxは、これらの値に対して元のインデックスのインデックスを保持します。B
  • B_in_A_boolは、B_unique_sortedのサイズで、の値がAにあるかどうかを格納する が格納されるブール値配列です。
    注:私はAが既に一意であることを前提としています。私は、出力がB_idx
    注意に関して返却する必要があるため私はに(Bからユニークヴァルス)のために調べる必要があります。

は今、あなたは、元のB

B_idx[B_in_A_bool] 

に入る共通ヴァルス

B_unique_sorted[B_in_A_bool] 

とそれぞれの指標のいずれかにB_in_A_boolを使用することができます最後に、私はこれがより大幅に高速であることを前提としてい私はそれをテストしなかったが、純粋なPython for-loop。

+0

+1できるだけnumpyを使用するために、この方法で大きなスピードアップを達成することができます(私は難しい方法を学んだので_ <) –

+1

気をつけろ!これはインデックスの順序を保持しません!範囲(1,6)と[5,1]で試してみてください。インデックスの順序が必要ない場合は、np.in1d()を使用してからnp.nonzero()[0] – aless80

+1

を使用してください。https://stackoverflow.com/questions/33678543/finding -indices-of-matches-of-one-array-in-another-array - 正しい順序でインデックスを取得するための配列 – aless80

0

ここでは、MATLABと一致する出力引数[Lia、Locb]とPython 0以外の両方を返す正確なMATLAB相当のものがあります。これも有効なインデックスです。したがって、この関数は0を返しません。 Locb(Locb> 0)を返します。パフォーマンスはMATLABと同等です。

def ismember(a_vec, b_vec): 
    """ MATLAB equivalent ismember function """ 

    bool_ind = np.isin(a_vec,b_vec) 
    common = a[bool_ind] 
    common_unique, common_inv = np.unique(common, return_inverse=True)  # common = common_unique[common_inv] 
    b_unique, b_ind = np.unique(b_vec, return_index=True) # b_unique = b_vec[b_ind] 
    common_ind = b_ind[np.isin(b_unique, common_unique, assume_unique=True)] 
    return bool_ind, common_ind[common_inv] 

ビット(〜5倍)遅くなりますが、独自の機能を使用していない別の実装はここにある:

def ismember(a_vec, b_vec): 
    ''' MATLAB equivalent ismember function. Slower than above implementation''' 
    b_dict = {b_vec[i]: i for i in range(0, len(b_vec))} 
    indices = [b_dict.get(x) for x in a_vec if b_dict.get(x) is not None] 
    booleans = np.in1d(a_vec, b_vec) 
    return booleans, np.array(indices, dtype=int) 
関連する問題