2013-04-09 25 views
5

私はPythonの専門家ではありませんが、PCのすべてのCPUとコアを使用するマルチプロセッシングコードを書き留めることができました。私のコードは約1.6 GBの非常に大きな配列を読み込み、すべてのプロセスで配列を更新する必要があります。幸いにも、このアップデートは、人工的な星をイメージに追加することから成り、すべてのプロセスは、人工的な星を追加する場所の異なるイメージ位置を持っています。Pythonのマルチプロセッシングと共有変数

イメージが大きすぎます。プロセスを呼び出すたびに新しいイメージを作成することはできません。私の解決策は、共有メモリに変数を作成していたので、十分なメモリを節約できました。何らかの理由で、画像の90%で動作しますが、私のコードは、以前にプロセスに送信した位置のいくつかに乱数を追加していました。それは私が共有変数を作成する方法に関連していますか?自分のコードの実行中にプロセスが互いに干渉していますか?

奇妙なことに、単一のCPUとシングルコアを使用すると、画像は100%完璧で、画像に追加される乱数はありません。複数のプロセス間で大きな配列を共有する方法を提案しますか?ここで私のコードの関連部分。変数im_dataを定義すると、その行を読んでください。

import warnings 
warnings.filterwarnings("ignore") 

from mpl_toolkits.mplot3d import Axes3D 
from matplotlib import cm 
import matplotlib.pyplot as plt 
import sys,os 
import subprocess 
import numpy as np 
import time 
import cv2 as cv 
import pyfits 
from pyfits import getheader 
import multiprocessing, Queue 
import ctypes 

class Worker(multiprocessing.Process): 


def __init__(self, work_queue, result_queue): 

    # base class initialization 
    multiprocessing.Process.__init__(self) 

    # job management stuff 
    self.work_queue = work_queue 
    self.result_queue = result_queue 
    self.kill_received = False 

def run(self): 
    while not self.kill_received: 

     # get a task 
     try: 
      i_range, psf_file = self.work_queue.get_nowait() 
     except Queue.Empty: 
      break 

     # the actual processing 
     print "Adding artificial stars - index range=", i_range 

     radius=16 
     x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2) 
     x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c) 
     distance = np.sqrt(x**2 + y**2) 

     for i in range(i_range[0],i_range[1]): 
      psf_xy=np.zeros(psf_size[1:3], dtype=float) 
      j=0 
      for i_order in range(psf_order+1): 
       j_order=0 
       while (i_order+j_order < psf_order+1): 
        psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order 
        j_order+=1 
        j+=1 


      psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy) 
      psf_xy *= psf_factor 

      npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4) 
      npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy) 
      npsf_xy *= npsf_factor 

      im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])] 
      im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])] 
      npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])] 
      npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])] 

      im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10. 


     self.result_queue.put(id) 

if __name__ == "__main__": 

    n_cpu=2 
    n_core=6 
    n_processes=n_cpu*n_core*1 
    input_mock_file=sys.argv[1] 

    print "Reading file ", im_file[i] 
    hdu=pyfits.open(im_file[i]) 
    data=hdu[0].data 
    im_size=data.shape 

    im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
    im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
    im_data = im_data.reshape(im_size[0], im_size[1]) 
    im_data[:] = data 
    data=0 
    assert im_data.base.base is im_data_base.get_obj() 

    # run 
    # load up work queue 
    tic=time.time() 
    j_step=np.int(np.ceil(mock_n*1./n_processes)) 
    j_range=range(0,mock_n,j_step) 
    j_range.append(mock_n) 


    work_queue = multiprocessing.Queue() 
    for j in range(np.size(j_range)-1): 
    if work_queue.full(): 
     print "Oh no! Queue is full after only %d iterations" % j 
    work_queue.put((j_range[j:j+2], psf_file[i])) 

    # create a queue to pass to workers to store the results 
    result_queue = multiprocessing.Queue() 

    # spawn workers 
    for j in range(n_processes): 
    worker = Worker(work_queue, result_queue) 
    worker.start() 

    # collect the results off the queue 
    while not work_queue.empty(): 
    result_queue.get() 

    print "Writing file ", mock_im_file[i] 
    hdu[0].data=im_data 
    hdu.writeto(mock_im_file[i]) 
    print "%f s for parallel computation." % (time.time() - tic) 
+1

大きなアレイを共有する代わりに、小さなサブアレイに分割してこれらのサブアレイをサブプロセスに送信できませんか?結果を元の配列に戻します。 – freakish

+0

また、このような巨大な画像を処理するためにPythonとは異なるものを使用することを検討してください(C addon?)。 – freakish

答えて

3

私はこの問題は、(あなたがあなたの質問にそれを示唆するように)あなたが複数のスレッドから同じ配列でを書いているという事実から来ていると思います。

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 

私はあなたが(暗黙のロックは、アレイへのアクセスを同期するためのpythonで使用されている)、「プロセス・安全」な方法でim_data_baseに書き込むことができることをかなり確信していますが、私はあなたに書くことができるかわかりませんim_dataをプロセス安全な方法で実行します。

私はので、(私はあなたの問題を解決することを確認していないにもかかわらず)ロックあなたが必要とするたびに取得し、プロセスに続いてim_data

# Disable python implicit lock, we are going to use our own 
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1], 
    lock=False) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 
# Create our own lock 
im_data_lock = Lock() 

周り明示ロックを作成するようアドバイスしていますim_data

self.im_data_lock.acquire() 
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10 
self.im_data_lock.release() 

を修正するために、私はのためにあなたのプロセスのコンストラクタにロックを渡し、メンバーフィールド(self.im_data_lock)としてそれを格納するためのコードを省略しました簡潔さ。また、im_data配列をプロセスのコンストラクタに渡してメンバフィールドとして格納する必要があります。

1

問題は、複数のスレッドがイメージ/アレイの重複領域に書き込むときに発生します。したがって、イメージごとに1つのロックを設定するか、イメージのセクションごとにロックのセットを作成する必要があります(ロックの競合を減らすため)。

また、1セットのプロセスでイメージの修正を行い、実際のイメージを別の1つのスレッドで修正することもできます。

関連する問題