私はPythonの専門家ではありませんが、PCのすべてのCPUとコアを使用するマルチプロセッシングコードを書き留めることができました。私のコードは約1.6 GBの非常に大きな配列を読み込み、すべてのプロセスで配列を更新する必要があります。幸いにも、このアップデートは、人工的な星をイメージに追加することから成り、すべてのプロセスは、人工的な星を追加する場所の異なるイメージ位置を持っています。Pythonのマルチプロセッシングと共有変数
イメージが大きすぎます。プロセスを呼び出すたびに新しいイメージを作成することはできません。私の解決策は、共有メモリに変数を作成していたので、十分なメモリを節約できました。何らかの理由で、画像の90%で動作しますが、私のコードは、以前にプロセスに送信した位置のいくつかに乱数を追加していました。それは私が共有変数を作成する方法に関連していますか?自分のコードの実行中にプロセスが互いに干渉していますか?
奇妙なことに、単一のCPUとシングルコアを使用すると、画像は100%完璧で、画像に追加される乱数はありません。複数のプロセス間で大きな配列を共有する方法を提案しますか?ここで私のコードの関連部分。変数im_dataを定義すると、その行を読んでください。
import warnings
warnings.filterwarnings("ignore")
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing, Queue
import ctypes
class Worker(multiprocessing.Process):
def __init__(self, work_queue, result_queue):
# base class initialization
multiprocessing.Process.__init__(self)
# job management stuff
self.work_queue = work_queue
self.result_queue = result_queue
self.kill_received = False
def run(self):
while not self.kill_received:
# get a task
try:
i_range, psf_file = self.work_queue.get_nowait()
except Queue.Empty:
break
# the actual processing
print "Adding artificial stars - index range=", i_range
radius=16
x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2)
x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
distance = np.sqrt(x**2 + y**2)
for i in range(i_range[0],i_range[1]):
psf_xy=np.zeros(psf_size[1:3], dtype=float)
j=0
for i_order in range(psf_order+1):
j_order=0
while (i_order+j_order < psf_order+1):
psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
j_order+=1
j+=1
psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy)
psf_xy *= psf_factor
npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4)
npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
npsf_xy *= npsf_factor
im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])]
im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])]
npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10.
self.result_queue.put(id)
if __name__ == "__main__":
n_cpu=2
n_core=6
n_processes=n_cpu*n_core*1
input_mock_file=sys.argv[1]
print "Reading file ", im_file[i]
hdu=pyfits.open(im_file[i])
data=hdu[0].data
im_size=data.shape
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
data=0
assert im_data.base.base is im_data_base.get_obj()
# run
# load up work queue
tic=time.time()
j_step=np.int(np.ceil(mock_n*1./n_processes))
j_range=range(0,mock_n,j_step)
j_range.append(mock_n)
work_queue = multiprocessing.Queue()
for j in range(np.size(j_range)-1):
if work_queue.full():
print "Oh no! Queue is full after only %d iterations" % j
work_queue.put((j_range[j:j+2], psf_file[i]))
# create a queue to pass to workers to store the results
result_queue = multiprocessing.Queue()
# spawn workers
for j in range(n_processes):
worker = Worker(work_queue, result_queue)
worker.start()
# collect the results off the queue
while not work_queue.empty():
result_queue.get()
print "Writing file ", mock_im_file[i]
hdu[0].data=im_data
hdu.writeto(mock_im_file[i])
print "%f s for parallel computation." % (time.time() - tic)
大きなアレイを共有する代わりに、小さなサブアレイに分割してこれらのサブアレイをサブプロセスに送信できませんか?結果を元の配列に戻します。 – freakish
また、このような巨大な画像を処理するためにPythonとは異なるものを使用することを検討してください(C addon?)。 – freakish