2017-04-22 17 views
2

:今numpyの配列比較とインデックス

>>> np.size(array1) 
4004001 
>>> np.size(array2) 
1000 

を、配列2内の各要素は、最も近い値を持つ要素を見つけるために、配列1のすべての要素と比較される必要があります配列2のこの要素の値。 この値を見つけると、array2に対応するサイズの1つである1000の別の配列に格納する必要があります。

これは、forループを使用して配列2の各要素を取り出し、配列1の要素からその絶対値を減算した後、最小値をとることができます。

私はこれを行うためにnumpyベクトル化演算を使用したいと思いますが、私は壁に当たっています。

+1

両方の配列を最初に並べ替えます。その後、大きな配列をステップ実行し、小さな配列の現在の最も近い要素へのインデックスを保持します。必要に応じてインデックスをインクリメントします。これをスピードアップするものがあれば、私はひどく驚くことはありません。 –

+1

[numpy配列で最も近い値を見つける]の複製があります。(0120-18751) –

答えて

1

並列処理を最大限に活用するには、ベクトル化された関数が必要です。さらに、すべての値は、同じ基準(最も近い)を使用して同じ配列(array1)にあります。したがって、特にarray1を検索するための特別な機能を作ることができます。

しかし、ソリューションをより再利用可能にするには、より一般的なソリューションを作成し、より具体的なソリューションに変換する方がよいでしょう。従って、最も近い値を見つけるための一般的なアプローチとして、我々はthis find nearest solutionで始まる。その後、我々はそれが一度に複数の要素で作業できるようにするために、より具体的にすることをオンにし、それをベクトル:

import math 
import numpy as np 
from functools import partial 

def find_nearest_sorted(array,value): 
    idx = np.searchsorted(array, value, side="left") 
    if idx > 0 and (idx == len(array) or math.fabs(value - array[idx-1]) < math.fabs(value - array[idx])): 
     return array[idx-1] 
    else: 
     return array[idx] 

array1 = np.random.rand(4004001) 
array2 = np.random.rand(1000) 

array1_sorted = np.sort(array1) 

# Partially apply array1 to find function, to turn the general function 
# into a specific, working with array1 only. 
find_nearest_in_array1 = partial(find_nearest_sorted, array1_sorted) 

# Vectorize specific function to allow us to apply it to all elements of 
# array2, the numpy way. 
vectorized_find = np.vectorize(find_nearest_in_array1) 

output = vectorized_find(array2) 

がうまくいけば、これは最も近い値にarray2内のデータをマッピングし、あなたが望んで新しいベクトルでありますarray1

+0

そして、 'array1'を調べるためにmultiple 1回のソートコストを要するアレイを最初にソートすると、それに続く各検出操作を高速化することが有益です。 – JohanL

+0

@JohanLと皆さん、助けてくれてありがとう!私は以前はfunctoolsを使ったことがありません。これは素晴らしい! – sb25

0
import numpy as np 
a = np.random.random(size=4004001).astype(np.float16) 
b = np.random.random(size=1000).astype(np.float16) 
#use numpy broadcasting to compare pairwise difference and then find the min arg in a for each element in b. Finally extract elements from a using the argmin array as indexes. 
output = a[np.argmin(np.abs(b[:,None] -a),axis=1)] 

この解決策は簡単ですが、非常にメモリを消費します。大規模な配列で使用する場合は、さらに最適化する必要があります。

+0

このソリューションの時間と空間の複雑さは、問題が4004001x1000の行列に展開され、次に 'array1'をソートしないので、find(' min ')演算が必要以上に遅くなるためすることが。 – JohanL

+0

ええ、私はそれを実現しました。私はそのシンプルさを維持しながら最適化する方法を考えています。 – Allen

+0

答えを編集して説明を加えてください。コードのみの回答は、今後のSO読者の教育にはほとんど役に立ちません。あなたの答えは低品質であるためにモデレーションキューにあります。 – mickmackusa

0

最も「numpythonic」の方法は、broadcastingを使用することです。これは、距離行列を計算するための素早く簡単な方法です。その場合、絶対値のargminを取ることができます。 dmat

array1 = np.random.rand(4004001) 
array2 = np.random.rand(1000) 

# Calculate distance matrix (on truncated array1 for memory reasons) 
dmat = array1[:400400] - array2[:,None] 

# Take the abs of the distance matrix and work out the argmin along the last axis 
ix = np.abs(dmat).argmin(axis=1) 

は形状:

(1000, 400400) 

ixやコンテンツの形状:

(1000,)  
array([237473, 166831, 72369, 11663, 22998, 85179, 231702, 322752, ...]) 

は、あなたが一度にこの操作を行う場合は、それはメモリ空腹だし、実際にはしません指定した配列のサイズのために私の8GBマシンで作業します。そのため、array1のサイズを小さくしました。

メモリの制約内で動作させるには、アレイの1つをチャンクにスライスし、順番に(または並列に)各チャンクにブロードキャストを適用します。この場合、array2を10チャンクにスライスしました。

# Define number of chunks and calculate chunk size 
n_chunks = 10 
chunk_len = array2.size // n_chunks 

# Preallocate output array 
out = np.zeros(1000) 

for i in range(n_chunks): 
    s = slice(i*chunk_len, (i+1)*chunk_len) 
    out[s] = np.abs(array1 - array2[s, None]).argmin(axis=1) 
+0

あなたのソリューションは、チャンクであってもまだメモリが空いています。また、min演算がO(n)であるため、ソートされていないリストに対しても非常に遅いです。だからこそ私は幾分複雑なアプローチの必要性を感じましたが、時間の複雑さは大幅に改善されました。 – JohanL

+0

しかし、それは動作し、理解しやすいです。パラレル化によって解決できないOPの速度とメモリが重要な問題である場合、より複雑なアプローチが正当化されます。 – FuzzyDuck