Pythonで値の配列を読み取り、データ配列の一部を操作するために非同期で複数のプロセスを実行する小さなマルチプロセッシングプログラムを書きました。それぞれの別々のプロセスは、2次元配列のそれ自身の1次元セクションでなければならず、プロセス間で重複はありません。すべてのプロセスが完了すると、共有メモリアレイはファイルに書き出されますが、共有メモリアレイに期待値/計算値は存在しませんが、元の値はまだ残っています。プロセス内の新しい値の割り当てが共有メモリオブジェクトに固執していないようです。おそらく、私の問題を引き起こしていることを私が理解していない(例えば、参照渡しと値渡し)ということが起こっているのでしょうか?Pythonマルチプロセッシング:共有メモリ(numpy)配列が期待どおりに変更されていない
私は多くのワーカープロセスを作成し、JoinableQueueをインスタンス化するProcessorクラスを持っています。各プロセスによって呼び出される関数は、2次元共有メモリ配列のインデックス付きスライス上で動作し、それらの配列値を適切に更新するので、入力(共有メモリ)配列はすべての値を計算の結果に置き換える必要があります。結果の2番目の配列を持つ必要はありません。 main関数は、共有メモリ配列とインデックス値をcompute関数の引数として渡します。これらは、プロセスオブジェクトが作業項目を消費するキューに追加されます。以下のコードは次のとおりです。私は間違って行くよどこ
class Processor:
queue = None
def __init__(self,
number_of_workers=1):
# create a joinable queue
self.queue = JoinableQueue()
# create the processes
self.processes = [Process(target=self.compute) for _ in range(number_of_workers)]
for p in self.processes:
p.start()
def add_work_item(self, item):
# add the parameters list to the parameters queue
self.queue.put(item)
def compute(self):
while True:
# get a list of arguments from the queue
arguments = self.queue.get()
# if we didn't get one we keep looping
if arguments is None:
break
# process the arguments here
data = arguments[0]
index = arguments[1]
# only process non-empty grid cells, i.e. data array contains at least some non-NaN values
if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) or np.isnan(data[:, index]).all():
pass
else: # we have some valid values to work with
logger.info('Processing latitude: {}'.format(index))
# perform a fitting to gamma
results = do_something(data[:, index])
# update the shared array
data[:, index] = results
# indicate that the task has completed
self.queue.task_done()
def terminate(self):
# terminate all processes
for p in self.processes:
p.terminate()
def wait_on_all(self):
#wait until queue is empty
self.queue.join()
#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
try:
# log some timing info, used later for elapsed time
start_datetime = datetime.now()
logger.info("Start time: {}".format(start_datetime, '%x'))
# get the command line arguments
input_file = sys.argv[1]
input_var_name = sys.argv[2]
output_file_base = sys.argv[3]
month_scale = int(sys.argv[4])
# create the variable name from the indicator, distribution, and month scale
variable_name = 'spi_gamma_{}'.format(str(month_scale).zfill(2))
# open the NetCDF files
with netCDF4.Dataset(input_file) as input_dataset, \
netCDF4.Dataset(output_file_base + '_' + variable_name + '.nc', 'w') as output_dataset:
# read info from the input dataset and initialize the output for writing
# create a processor with a number of worker processes
number_of_workers = 1
processor = Processor(number_of_workers)
# for each longitude slice
for lon_index in range(lon_size):
logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))
# read the longitude slice into a data array
longitude_slice = input_dataset.variables[input_var_name][:, lon_index, :]
# reshape into a 1-D array
original_shape = longitude_slice.shape
flat_longitude_slice = longitude_slice.flatten()
# convert the array onto a shared memory array which can be accessed from within another process
shared_array_base = Array(ctypes.c_double, flat_longitude_slice)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(original_shape)
# loop over each latitude point in the longitude slice
for lat_index in range(lat_size):
# have the processor process the shared array at this index
arguments = [shared_array, lat_index]
processor.add_work_item(arguments)
# join to the processor and don't continue until all processes have completed
processor.wait_on_all()
# write the fitted longitude slice values into the output NetCDF
output_dataset.variables[variable_name][:, lon_index, :] = np.reshape(shared_array, (time_size, 1, lat_size))
# all processes have completed
processor.terminate()
except Exception, e:
logger.error('Failed to complete', exc_info=True)
raise
誰もが見ることができ、私は期待していて、すなわち、なぜ共有メモリ配列の値が更新されていませんか?
ご意見やご提案ありがとうございます。
UPDATE:私は、この単一のプロセスのために今働いているが、私は、複数のプロセスを起動しようとすると、私はピクルスのエラーを取得:
pickle.PicklingError: Can't pickle '_subprocess_handle' object: <_subprocess_handle object at 0x00000000021CF9F0>
第二のプロセスが開始されたときにこれは、プロセッサ内、発生。 ()ファンクションです。 1つのプロセス(number_of_workers = 1)を使用して以下のコードを実行すると、このエラーが発生せず、コードが複数のプロセッサを使用していなくても期待通りに実行されます。それはまだ予期しない動作を展示しているが、私は今、成功したソリューションを実装している
class Processor:
queue = None
def __init__(self,
shared_array,
data_shape,
number_of_workers=1):
# create a joinable queue
self.queue = JoinableQueue()
# keep reference to shared memory array
self.shared_array = shared_array
self.data_shape = data_shape
# create the processes
self.processes = [Process(target=self.compute_indicator) for _ in range(number_of_workers)]
for p in self.processes:
p.start()
def add_work_item(self, item):
# add the parameters list to the parameters queue
self.queue.put(item)
def compute_indicator(self):
while True:
# get a list of arguments from the queue
arguments = self.queue.get()
# if we didn't get one we keep looping
if arguments is None:
break
# process the arguments here
index = arguments[0]
# turn the shared array into a numpy array
data = np.ctypeslib.as_array(self.shared_array)
data = data.reshape(self.data_shape)
# only process non-empty grid cells, i.e. data array contains at least some non-NaN values
if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) \
or np.isnan(data[:, index]).all() or (data[:, index] < 0).all():
pass
else: # we have some valid values to work with
logger.info('Processing latitude: {}'.format(index))
# perform computation
fitted_values = do_something(data[:, index])
# update the shared array
data[:, index] = fitted_values
# indicate that the task has completed
self.queue.task_done()
def terminate(self):
# terminate all processes
for p in self.processes:
p.terminate()
def wait_on_all(self):
#wait until queue is empty
self.queue.join()
#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
# create a shared memory array which can be accessed from within another process
shared_array_base = Array(ctypes.c_double, time_size * lat_size, lock=False)
# create a processor with a number of worker processes
number_of_workers = 4
data_shape = (time_size, lat_size)
processor = Processor(shared_array_base, data_shape, number_of_workers)
# for each longitude slice
for lon_index in range(lon_size):
logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))
# get the shared memory array and convert into a numpy array with proper dimensions
longitude_array = np.ctypeslib.as_array(shared_array_base)
longitude_array = np.reshape(longitude_array, data_shape)
# read the longitude slice into the shared memory array
longitude_array[:] = input_dataset.variables[input_var_name][:, lon_index, :]
# a list of arguments we'll map to the processes of the pool
arguments_iterable = []
# loop over each latitude point in the longitude slice
for lat_index in range(lat_size):
# have the processor process the shared array at this index
processor.add_work_item([lat_index])
# join to the processor and don't continue until all processes have completed
processor.wait_on_all()
# get the longitude slice of fitted values from the shared memory array and convert
# into a numpy array with proper dimensions which we can then use to write to NetCDF
fitted_array = np.ctypeslib.as_array(shared_array_base)
fitted_array = np.reshape(fitted_array, (time_size, 1, lat_size))
# write the longitude slice of computed values into the output NetCDF
output_dataset.variables[variable_name][:, lon_index, :] = fitted_array
# all processes have completed
processor.terminate()
自分で書くのではなく、 'multiprocessing.Pool'の' map() 'メソッドを使ってみませんか? –
ちょっと考えましたが、共有配列を作成する前に 'Process'-es *を作成しています。 'Process'-esが実際に共有配列にアクセスできることは確かですか?彼らはただコピーを得ることができるかもしれません... –
'shared_array_base'は、ターゲット' compute'メソッドへの引数として渡されるべきです。実際には、POSIXシステムでは 'fork'を介して継承する必要がありますが、Windowsサポートでは、' mmap'共有メモリの名前と状態を子プロセスにピクルさせパイプすることができるようにする必要があります。各ワーカープロセスで共有配列をNumPy配列としてラップすることができます。あなたが現在やっているように、各作業員にコピーをピケッとパイプしないでください。 – eryksun