2016-10-16 14 views
1

私は純粋なPythonで、次のコードを書いて、それが何をするかの説明はドキュメンテーション文字列である:Cython numpyの配列インデクサ速度向上

import numpy as np 
from scipy.ndimage.measurements import find_objects 
import itertools 

def alt_indexer(arr): 

    """ 
    Returns a dictionary with the elements of arr as key 
    and the corresponding slice as value. 

    Note: 

     This function assumes arr is sorted. 

    Example: 

     >>> arr = [0,0,3,2,1,2,3] 
     >>> loc = _indexer(arr) 
     >>> loc 
     {0: (slice(0L, 2L, None),), 
     1: (slice(2L, 3L, None),), 
     2: (slice(3L, 5L, None),), 
     3: (slice(5L, 7L, None),)} 
     >>> arr = sorted(arr) 
     >>> arr[loc[3][0]] 
     [3, 3] 
     >>> arr[loc[2][0]] 
     [2, 2] 

    """ 

    unique, counts = np.unique(arr, return_counts=True) 
    labels = np.arange(1,len(unique)+1) 
    labels = np.repeat(labels,counts) 

    slicearr = find_objects(labels) 
    index_dict = dict(itertools.izip(unique,slicearr)) 

    return index_dict 

私は非常に大きな配列のインデックスを作成されますので、私はスピードアップしたかったですここでは、cythonを使っての操作は同等の実装です::私はそれが両方の操作を完了するのに要した時間を比較した

import numpy as np 
cimport numpy as np 

def _indexer(arr): 

    cdef tuple unique_counts = np.unique(arr, return_counts=True) 
    cdef np.ndarray[np.int32_t,ndim=1] unique = unique_counts[0] 
    cdef np.ndarray[np.int32_t,ndim=1] counts = unique_counts[1].astype(int) 

    cdef int start=0 
    cdef int end 
    cdef int i 
    cdef dict d ={} 

    for i in xrange(len(counts)): 
     if i>0: 
      start = counts[i-1]+start 
     end=counts[i]+start 
     d[unique[i]]=slice(start,end) 
    return d 

ベンチマーク

In [26]: import numpy as np 

In [27]: rr=np.random.randint(0,1000,1000000) 

In [28]: %timeit _indexer(rr) 
10 loops, best of 3: 40.5 ms per loop 

In [29]: %timeit alt_indexer(rr) #pure python 
10 loops, best of 3: 51.4 ms per loop 

速度向上は最小限に抑えられています。私はnumpyを使用して以来、私のコードはすでに部分的に最適化されていたことを認識しています。

私が気づいていないボトルネックはありますか? np.uniqueを使用せず、代わりに自分の実装を書いてください。

ありがとうございました。

def unique_counts(arr): 
    counts = np.bincount(arr) 
    mask = counts!=0 
    unique = np.nonzero(mask)[0] 
    return unique, counts[mask] 

ランタイムテスト

ケース - 非負、非常に大きく、多くのない繰り返しint番号を持つarr

+1

'cython'ループは、純粋な' C'に変換できれば高速です。あなたの場合、ループは 'numpy.unique'とPython辞書とスライスオブジェクトを使います。 – hpaulj

答えて

1

は、ここnp.unique(arr, return_counts=True)と同じ動作をシミュレートするnp.bincountを使用して別のアプローチです#1:

In [83]: arr = np.random.randint(0,100,(1000)) # Input array 

In [84]: unique, counts = np.unique(arr, return_counts=True) 
    ...: unique1, counts1 = unique_counts(arr) 
    ...: 

In [85]: np.allclose(unique,unique1) 
Out[85]: True 

In [86]: np.allclose(counts,counts1) 
Out[86]: True 

In [87]: %timeit np.unique(arr, return_counts=True) 
10000 loops, best of 3: 53.2 µs per loop 

In [88]: %timeit unique_counts(arr) 
100000 loops, best of 3: 10.2 µs per loop 

ケース#2:

In [89]: arr = np.random.randint(0,1000,(10000)) # Input array 

In [90]: %timeit np.unique(arr, return_counts=True) 
1000 loops, best of 3: 713 µs per loop 

In [91]: %timeit unique_counts(arr) 
10000 loops, best of 3: 39.1 µs per loop 

ケース#3:のが最大の範囲に分でいくつか不足している番号を持つuniqueでケースを実行し、健全性チェックとしてnp.uniqueバージョンに対する結果を検証してみましょう。このケースでは、繰り返し回数が増えないため、パフォーマンスが向上するとは予想されません。

In [98]: arr = np.random.randint(0,10000,(1000)) # Input array 

In [99]: unique, counts = np.unique(arr, return_counts=True) 
    ...: unique1, counts1 = unique_counts(arr) 
    ...: 

In [100]: np.allclose(unique,unique1) 
Out[100]: True 

In [101]: np.allclose(counts,counts1) 
Out[101]: True 

In [102]: %timeit np.unique(arr, return_counts=True) 
10000 loops, best of 3: 61.9 µs per loop 

In [103]: %timeit unique_counts(arr) 
10000 loops, best of 3: 71.8 µs per loop 
+0

ニースのソリューション!残念ながら、整数は1000000から始まります。場合によっては、配列に浮動小数点数が含まれます。 – snowleopard

+0

@snowleopardええ、次に、最適化するための他の方法を探すべきです。 – Divakar

+0

あなたは正しいですが、np.uniqueを何かに置き換える必要があるようです。私が本当に必要とするのは、スライスの一部であり、いくつかの冗長な操作があるようです。 – snowleopard

関連する問題